Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of getting unread counts in rooms #13119

Merged
merged 6 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13119.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
3 changes: 3 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def get_reactor(self) -> ISynapseReactor:
def get_instance_name(self) -> str:
return "master"

def should_send_federation(self) -> bool:
return False


class Porter:
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
Expand All @@ -112,6 +111,7 @@ class DataStore(
SearchStore,
TagsStore,
AccountDataStore,
StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
Expand Down
16 changes: 13 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -122,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
return DEFAULT_NOTIF_ACTION


class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore):
class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -218,7 +218,7 @@ def _get_unread_counts_by_receipt_txn(
retcol="event_id",
)

stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined]
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)

return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -291,12 +291,22 @@ def _get_notif_unread_count_for_user_room(
actions that have been deleted from `event_push_actions` table.
"""

# If there have been no events in the room since the stream ordering,
# there can't be any push actions either.
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return 0, 0

clause = ""
args = [user_id, room_id, stream_ordering]
if max_stream_ordering is not None:
clause = "AND ea.stream_ordering <= ?"
args.append(max_stream_ordering)

# If the max stream ordering is less than the min stream ordering,
# then obviously there are zero push actions in that range.
if max_stream_ordering <= stream_ordering:
return 0, 0

sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
Expand Down
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
Set,
Tuple,
cast,
overload,
)

import attr
from frozendict import frozendict
from typing_extensions import Literal

from twisted.internet import defer

Expand Down Expand Up @@ -795,6 +797,24 @@ async def get_current_room_stream_token_for_room_id(
)
return RoomStreamToken(topo, stream_ordering)

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: Literal[False] = False,
) -> int:
...

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: bool = False,
) -> Optional[int]:
...

def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def _inject_actions(stream: int, action: list) -> None:
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.store._events_stream_cache.entity_has_changed(room_id, stream)

self.get_success(
self.store.add_push_actions_to_staging(
event.event_id,
Expand Down