Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up get_unread_event_push_actions_by_room #13005

Merged
merged 20 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import attr
from prometheus_client import Counter

from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
Expand Down Expand Up @@ -1054,14 +1054,10 @@ async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> NotifCounts:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)

return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
room_id,
sync_config.user.to_string(),
)

async def generate_sync_result(
Expand Down
33 changes: 13 additions & 20 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
from typing import Dict

from synapse.api.constants import ReceiptTypes
from synapse.events import EventBase
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
from synapse.storage.controllers import StorageControllers
Expand All @@ -24,30 +23,24 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
invites = await store.get_invited_rooms_for_local_user(user_id)
joins = await store.get_rooms_for_user(user_id)

my_receipts_by_room = await store.get_receipts_for_user(
user_id, (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
)

badge = len(invites)

for room_id in joins:
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]

notifs = await (
store.get_unread_event_push_actions_by_room_for_user(
room_id, user_id, last_unread_event_id
)
notifs = await (
store.get_unread_event_push_actions_by_room_for_user(
room_id,
user_id,
)
if notifs.notify_count == 0:
continue
)
if notifs.notify_count == 0:
continue

if group_by_room:
# return one badge count per conversation
badge += 1
else:
# increment the badge count by the number of unread messages in the room
badge += notifs.notify_count
if group_by_room:
# return one badge count per conversation
badge += 1
else:
# increment the badge count by the number of unread messages in the room
badge += notifs.notify_count
return badge
squahtx marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ class DataStore(
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
EventPushActionsStore,
ServerMetricsStore,
ReceiptsStore,
EndToEndKeyStore,
EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
Expand All @@ -126,7 +127,6 @@ class DataStore(
UIAuthStore,
EventForwardExtremitiesStore,
CacheInvalidationWorkerStore,
ServerMetricsStore,
LockStore,
SessionStore,
):
Expand Down
27 changes: 13 additions & 14 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import attr

from synapse.api.constants import ReceiptTypes
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -119,7 +121,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
return DEFAULT_NOTIF_ACTION


class EventPushActionsWorkerStore(SQLBaseStore):
class EventPushActionsWorkerStore(ReceiptsWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -148,12 +150,11 @@ def __init__(
self._rotate_notifs, 30 * 60 * 1000
)

@cached(num_args=3, tree=True, max_entries=5000)
@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
room_id: str,
user_id: str,
last_read_event_id: Optional[str],
) -> NotifCounts:
"""Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt.
Expand All @@ -165,8 +166,6 @@ async def get_unread_event_push_actions_by_room_for_user(
Args:
room_id: The room to retrieve the counts in.
user_id: The user to retrieve the counts for.
last_read_event_id: The event associated with the latest read receipt for
this user in this room. None if no receipt for this user in this room.

Returns
A dict containing the counts mentioned earlier in this docstring,
Expand All @@ -178,24 +177,24 @@ async def get_unread_event_push_actions_by_room_for_user(
self._get_unread_counts_by_receipt_txn,
room_id,
user_id,
last_read_event_id,
)

def _get_unread_counts_by_receipt_txn(
self,
txn: LoggingTransaction,
room_id: str,
user_id: str,
last_read_event_id: Optional[str],
) -> NotifCounts:
stream_ordering = None
result = self.get_last_receipt_for_user_txn(
squahtx marked this conversation as resolved.
Show resolved Hide resolved
txn,
user_id,
room_id,
receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)

if last_read_event_id is not None:
stream_ordering = self.get_stream_id_for_event_txn( # type: ignore[attr-defined]
txn,
last_read_event_id,
allow_none=True,
)
stream_ordering = None
if result:
_, stream_ordering = result

if stream_ordering is None:
# Either last_read_event_id is None, or it's an event we don't have (e.g.
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ def _load_rules(
# the abstract methods being implemented.
class PushRulesWorkerStore(
ApplicationServiceWorkerStore,
ReceiptsWorkerStore,
PusherWorkerStore,
RoomMemberWorkerStore,
ReceiptsWorkerStore,
EventsWorkerStore,
SQLBaseStore,
metaclass=abc.ABCMeta,
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,12 @@ def invalidate_caches_for_receipt(
self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate((room_id,))

# We use this method to invalidate so that we don't end up with circular
# dependencies between the receipts and push action stores.
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)

def process_replication_rows(
self,
stream_name: str,
Expand Down
14 changes: 11 additions & 3 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from canonicaljson import encode_canonical_json

from synapse.api.constants import ReceiptTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
from synapse.handlers.room import RoomEventSource
Expand Down Expand Up @@ -164,9 +165,16 @@ def test_push_actions_for_user(self):
)
event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
self.replicate()

self.get_success(
self.master_store.insert_receipt(
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {}
)
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=0, unread_count=0, notify_count=0),
)

Expand All @@ -179,7 +187,7 @@ def test_push_actions_for_user(self):
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=0, unread_count=0, notify_count=1),
)

Expand All @@ -194,7 +202,7 @@ def test_push_actions_for_user(self):
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=1, unread_count=0, notify_count=2),
)

Expand Down