Skip to content

Commit

Permalink
feat(ui): TimelineInnerStateLock::lock is replaced by read and `w…
Browse files Browse the repository at this point in the history
…rite`.

This patch splits the `TimelineInnerStateLock::lock` method into
`::read` and `::write`. The idea is that a read-only lock doesn't
hold a clone of the lock release observer (`lock_release_ob:
SharedObservable<()>`), it will not notify the observer. Then it's only
the write lock that holds a clone of the lock release observer, and will
notify it.

This patch updates the code accordingly as best as possible.
  • Loading branch information
Hywan committed Aug 31, 2023
1 parent 95e0414 commit 6278bc0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 57 deletions.
59 changes: 30 additions & 29 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use super::{
mod state;

pub(super) use self::state::TimelineInnerState;
use self::state::{TimelineInnerStateLock, TimelineInnerStateLockGuard};
use self::state::{TimelineInnerStateLock, TimelineInnerStateWriteGuard};

#[derive(Clone, Debug)]
pub(super) struct TimelineInner<P: RoomDataProvider = Room> {
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
///
/// Cheap because `im::Vector` is cheap to clone.
pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
self.state.lock().await.items.clone()
self.state.read().await.items.clone()
}

pub(super) fn subscribe(&self) -> impl Stream<Item = VectorDiff<Arc<TimelineItem>>> {
Expand All @@ -158,7 +158,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {

loop {
let (timeline_items, timeline_stream) = {
let state = state.lock().await;
let state = state.read().await;

(state.items.clone(), state.items.subscribe())
};
Expand All @@ -175,7 +175,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
let _ = ignore_user_list_stream.next().await;

// A user has been ignored/unignored? Let's clear the timeline.
state.lock().await.clear();
state.write().await.clear();
}
}
.switch()
Expand All @@ -196,15 +196,15 @@ impl<P: RoomDataProvider> TimelineInner<P> {
F: Fn(Arc<TimelineItem>) -> Option<U>,
{
trace!("Creating timeline items signal");
let state = self.state.lock().await;
let state = self.state.read().await;
state.items.subscribe_filter_map(f)
}

pub(super) async fn toggle_reaction_local(
&self,
annotation: &Annotation,
) -> Result<ReactionAction, super::Error> {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;

let user_id = self.room_data_provider.own_user_id();

Expand Down Expand Up @@ -325,7 +325,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
) {
let own_user_id = self.room_data_provider.own_user_id().to_owned();
self.state
.lock()
.write()
.await
.users_read_receipts
.entry(own_user_id)
Expand All @@ -341,7 +341,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {

debug!("Adding {} initial events", events.len());

let mut state = self.state.lock().await;
let mut state = self.state.write().await;
for event in events {
state
.handle_remote_event(
Expand All @@ -356,12 +356,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {

pub(super) async fn clear(&self) {
trace!("Clearing timeline");
self.state.lock().await.clear();
self.state.write().await.clear();
}

#[instrument(skip_all)]
pub(super) async fn handle_joined_room_update(&self, update: JoinedRoom) {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;
state.handle_sync_timeline(update.timeline, &self.room_data_provider, &self.settings).await;

trace!("Handling account data");
Expand Down Expand Up @@ -396,7 +396,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {

pub(super) async fn handle_sync_timeline(&self, timeline: Timeline) {
self.state
.lock()
.write()
.await
.handle_sync_timeline(timeline, &self.room_data_provider, &self.settings)
.await;
Expand All @@ -405,7 +405,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
#[cfg(test)]
pub(super) async fn handle_live_event(&self, event: SyncTimelineEvent) {
self.state
.lock()
.write()
.await
.handle_live_event(event, &self.room_data_provider, &self.settings)
.await;
Expand All @@ -421,7 +421,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
let sender = self.room_data_provider.own_user_id().to_owned();
let profile = self.room_data_provider.profile(&sender).await;

let mut state = self.state.lock().await;
let mut state = self.state.write().await;
state.handle_local_event(sender, profile, txn_id, content, &self.settings);
}

Expand All @@ -436,7 +436,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
let sender = self.room_data_provider.own_user_id().to_owned();
let profile = self.room_data_provider.profile(&sender).await;

let mut state = self.state.lock().await;
let mut state = self.state.write().await;
state.handle_local_redaction(sender, profile, txn_id, to_redact, content, &self.settings);
}

Expand All @@ -449,7 +449,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
txn_id: &TransactionId,
send_state: EventSendState,
) {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;

let new_event_id: Option<&EventId> = match &send_state {
EventSendState::Sent { event_id } => Some(event_id),
Expand Down Expand Up @@ -550,7 +550,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
annotation: &Annotation,
result: &ReactionToggleResult,
) -> Result<ReactionAction, super::Error> {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;
let user_id = self.room_data_provider.own_user_id();
let annotation_key: AnnotationKey = annotation.into();

Expand Down Expand Up @@ -592,7 +592,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
&self,
txn_id: &TransactionId,
) -> Option<TimelineItemContent> {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;

let (idx, item) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))?;
let local_item = item.as_local()?;
Expand All @@ -618,7 +618,8 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}

pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;

if let Some((idx, _)) =
rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
{
Expand All @@ -639,7 +640,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
&self,
events: Vec<TimelineEvent>,
) -> Option<HandleManyEventsResult> {
let mut state = self.state.lock().await;
let mut state = self.state.write().await;

let mut total = HandleManyEventsResult::default();
for event in events {
Expand All @@ -660,7 +661,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}

pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) {
self.state.lock().await.set_fully_read_event(fully_read_event_id)
self.state.write().await.set_fully_read_event(fully_read_event_id)
}

#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -691,7 +692,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
) {
use super::EncryptedMessage;

let mut state = self.state.clone().lock_owned().await;
let mut state = self.state.clone().write_owned().await;

let should_retry = move |session_id: &str| {
if let Some(session_ids) = &session_ids {
Expand Down Expand Up @@ -821,7 +822,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}

async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
self.state.lock().await.items.for_each(|mut entry| {
self.state.write().await.items.for_each(|mut entry| {
let Some(event_item) = entry.as_event() else { return };
if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
let new_item = entry.with_kind(TimelineItemKind::Event(
Expand All @@ -835,7 +836,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
pub(super) async fn update_sender_profiles(&self) {
trace!("Updating sender profiles");

let mut state = self.state.lock().await;
let mut state = self.state.write().await;
let mut entries = state.items.entries();
while let Some(mut entry) = entries.next() {
let Some(event_item) = entry.as_event() else { continue };
Expand Down Expand Up @@ -875,7 +876,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
#[cfg(test)]
pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
let own_user_id = self.room_data_provider.own_user_id();
self.state.lock().await.handle_explicit_read_receipts(receipt_event_content, own_user_id);
self.state.write().await.handle_explicit_read_receipts(receipt_event_content, own_user_id);
}
}

Expand Down Expand Up @@ -914,7 +915,7 @@ impl TimelineInner {
&self,
event_id: &EventId,
) -> Result<(), super::Error> {
let state = self.state.lock().await;
let state = self.state.write().await;
let (index, item) = rfind_event_by_id(&state.items, event_id)
.ok_or(super::Error::RemoteEventNotInTimeline)?;
let remote_item = item.as_remote().ok_or(super::Error::RemoteEventNotInTimeline)?.clone();
Expand Down Expand Up @@ -949,7 +950,7 @@ impl TimelineInner {

// We need to be sure to have the latest position of the event as it might have
// changed while waiting for the request.
let mut state = self.state.lock().await;
let mut state = self.state.write().await;
let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
.ok_or(super::Error::RemoteEventNotInTimeline)?;

Expand Down Expand Up @@ -985,7 +986,7 @@ impl TimelineInner {
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
let state = self.state.lock().await;
let state = self.state.read().await;
let room = self.room();

state.latest_user_read_receipt(user_id, room).await
Expand All @@ -1006,7 +1007,7 @@ impl TimelineInner {
}

let own_user_id = self.room().own_user_id();
let state = self.state.lock().await;
let state = self.state.read().await;
let room = self.room();

match receipt_type {
Expand Down Expand Up @@ -1060,7 +1061,7 @@ pub(super) struct HandleManyEventsResult {
}

async fn fetch_replied_to_event(
mut state: TimelineInnerStateLockGuard<'_>,
mut state: TimelineInnerStateWriteGuard<'_>,
index: usize,
item: &EventTimelineItem,
message: &Message,
Expand Down
42 changes: 23 additions & 19 deletions crates/matrix-sdk-ui/src/timeline/inner/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
UserId,
};
use tokio::sync::{Mutex, MutexGuard, OwnedMutexGuard};
use tokio::sync::{OwnedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, error, instrument, trace, warn};

use super::{ReactionState, TimelineInnerSettings};
Expand All @@ -58,29 +58,33 @@ use crate::{

#[derive(Clone)]
pub(in crate::timeline) struct TimelineInnerStateLock {
inner: Arc<Mutex<TimelineInnerState>>,
inner: Arc<RwLock<TimelineInnerState>>,
lock_release_ob: SharedObservable<()>,
}

impl TimelineInnerStateLock {
pub(super) fn new(state: TimelineInnerState) -> Self {
Self { inner: Arc::new(Mutex::new(state)), lock_release_ob: Default::default() }
Self { inner: Arc::new(RwLock::new(state)), lock_release_ob: Default::default() }
}

pub(super) fn subscribe_lock_release(&self) -> Subscriber<()> {
self.lock_release_ob.subscribe()
}

pub async fn lock(&self) -> TimelineInnerStateLockGuard<'_> {
TimelineInnerStateLockGuard {
inner: self.inner.lock().await,
pub async fn read(&self) -> RwLockReadGuard<'_, TimelineInnerState> {
self.inner.read().await
}

pub async fn write(&self) -> TimelineInnerStateWriteGuard<'_> {
TimelineInnerStateWriteGuard {
inner: self.inner.write().await,
lock_release_ob: self.lock_release_ob.clone(),
}
}

pub async fn lock_owned(&self) -> TimelineInnerStateOwnedLockGuard {
TimelineInnerStateOwnedLockGuard {
inner: self.inner.clone().lock_owned().await,
pub async fn write_owned(&self) -> TimelineInnerStateOwnedWriteGuard {
TimelineInnerStateOwnedWriteGuard {
inner: self.inner.clone().write_owned().await,
lock_release_ob: self.lock_release_ob.clone(),
}
}
Expand Down Expand Up @@ -477,51 +481,51 @@ impl TimelineInnerState {
}
}

pub(in crate::timeline) struct TimelineInnerStateLockGuard<'a> {
inner: MutexGuard<'a, TimelineInnerState>,
pub(in crate::timeline) struct TimelineInnerStateWriteGuard<'a> {
inner: RwLockWriteGuard<'a, TimelineInnerState>,
lock_release_ob: SharedObservable<()>,
}

impl Deref for TimelineInnerStateLockGuard<'_> {
impl Deref for TimelineInnerStateWriteGuard<'_> {
type Target = TimelineInnerState;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for TimelineInnerStateLockGuard<'_> {
impl DerefMut for TimelineInnerStateWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl Drop for TimelineInnerStateLockGuard<'_> {
impl Drop for TimelineInnerStateWriteGuard<'_> {
fn drop(&mut self) {
self.lock_release_ob.set(());
}
}

pub(in crate::timeline) struct TimelineInnerStateOwnedLockGuard {
inner: OwnedMutexGuard<TimelineInnerState>,
pub(in crate::timeline) struct TimelineInnerStateOwnedWriteGuard {
inner: OwnedRwLockWriteGuard<TimelineInnerState>,
lock_release_ob: SharedObservable<()>,
}

impl Deref for TimelineInnerStateOwnedLockGuard {
impl Deref for TimelineInnerStateOwnedWriteGuard {
type Target = TimelineInnerState;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for TimelineInnerStateOwnedLockGuard {
impl DerefMut for TimelineInnerStateOwnedWriteGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl Drop for TimelineInnerStateOwnedLockGuard {
impl Drop for TimelineInnerStateOwnedWriteGuard {
fn drop(&mut self) {
self.lock_release_ob.set(());
}
Expand Down
12 changes: 3 additions & 9 deletions crates/matrix-sdk-ui/tests/integration/timeline/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,10 @@ async fn batched() {

let hdl = tokio::spawn(async move {
pin_mut!(timeline_stream);

let next_batch = timeline_stream.next().await.unwrap();
// One `VectorDiff::Reset`…
assert_eq!(next_batch.len(), 1);
assert_matches!(&next_batch[0], VectorDiff::Reset { values } => {
// … which is empty.
assert_eq!(values.len(), 0);
});
let next_batch = timeline_stream.next().await.unwrap();
// One day divider, and three event items.
assert_eq!(next_batch.len(), 4);
// One `VectorDiff::Reset` + one day divider + three event items.
assert_eq!(next_batch.len(), 5);
});

ev_builder.add_joined_room(
Expand Down

0 comments on commit 6278bc0

Please sign in to comment.