Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of getting unread counts in rooms #13119

Merged
merged 6 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13119.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
3 changes: 3 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def get_reactor(self) -> ISynapseReactor:
def get_instance_name(self) -> str:
return "master"

def should_send_federation(self) -> bool:
return False


class Porter:
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
Expand All @@ -112,6 +111,7 @@ class DataStore(
SearchStore,
TagsStore,
AccountDataStore,
StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
Expand Down
16 changes: 13 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -122,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
return DEFAULT_NOTIF_ACTION


class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore):
class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -218,7 +218,7 @@ def _get_unread_counts_by_receipt_txn(
retcol="event_id",
)

stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined]
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)

return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -291,12 +291,22 @@ def _get_notif_unread_count_for_user_room(
actions that have been deleted from `event_push_actions` table.
"""

# If there have been no events in the room since the stream ordering,
# there can't be any push actions either.
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return 0, 0

clause = ""
args = [user_id, room_id, stream_ordering]
if max_stream_ordering is not None:
clause = "AND ea.stream_ordering <= ?"
args.append(max_stream_ordering)

# If the max stream ordering is less than the min stream ordering,
# then obviously there are zero push actions in that range.
if max_stream_ordering < stream_ordering:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return 0, 0

sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
Set,
Tuple,
cast,
overload,
)

import attr
from frozendict import frozendict
from typing_extensions import Literal

from twisted.internet import defer

Expand Down Expand Up @@ -795,6 +797,32 @@ async def get_current_room_stream_token_for_room_id(
)
return RoomStreamToken(topo, stream_ordering)

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: Literal[True],
) -> int:
...

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
) -> int:
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get away with one fewer overload if we include the default value for allow_none?

Suggested change
@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: Literal[True],
) -> int:
...
@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
) -> int:
...
@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: Literal[True] = True,
) -> int:
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, yes. I got confused and these type hints are actually wrong (allow_none=True should return an Optional)


@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: bool = False,
) -> Optional[int]:
...

def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def _inject_actions(stream: int, action: list) -> None:
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.store._events_stream_cache.entity_has_changed(room_id, stream)

self.get_success(
self.store.add_push_actions_to_staging(
event.event_id,
Expand Down