From 1259832491dc3e31a41baf16b48e4783367d6d1f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 14:07:35 +0100 Subject: [PATCH 1/6] Improve performance of getting unread counts in rooms --- synapse/storage/databases/main/__init__.py | 2 +- .../storage/databases/main/event_push_actions.py | 13 ++++++++++++- tests/storage/test_event_push_actions.py | 2 ++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 57aaf778ec2a..a3d31d373724 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -87,7 +87,6 @@ class DataStore( RoomStore, RoomBatchStore, RegistrationStore, - StreamWorkerStore, ProfileStore, PresenceStore, TransactionWorkerStore, @@ -112,6 +111,7 @@ class DataStore( SearchStore, TagsStore, AccountDataStore, + StreamWorkerStore, OpenIdStore, ClientIpWorkerStore, DeviceStore, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 10a7962382f7..d0916e7874b9 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -27,6 +27,7 @@ ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -122,7 +123,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st return DEFAULT_NOTIF_ACTION -class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore): +class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -291,12 +292,22 @@ def _get_notif_unread_count_for_user_room( actions that have been deleted from `event_push_actions` table. """ + # If there have been no events in the room since the stream ordering, + # there can't be any push actions either. + if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering): + return 0, 0 + clause = "" args = [user_id, room_id, stream_ordering] if max_stream_ordering is not None: clause = "AND ea.stream_ordering <= ?" args.append(max_stream_ordering) + # If the max stream ordering is less than the min stream ordering, + # then obviously there are zero push actions in that range. + if max_stream_ordering < stream_ordering: + return 0, 0 + sql = f""" SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 2ac5f6db5e74..e13401c44b41 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -86,6 +86,8 @@ def _inject_actions(stream: int, action: list) -> None: event.internal_metadata.is_outlier.return_value = False event.depth = stream + self.store._events_stream_cache.entity_has_changed(room_id, stream) + self.get_success( self.store.add_push_actions_to_staging( event.event_id, From ad94c6b629edd8cd3a31b0b386f7496bf53f3ce2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 14:08:37 +0100 Subject: [PATCH 2/6] Newsfile --- changelog.d/13119.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13119.misc diff --git a/changelog.d/13119.misc b/changelog.d/13119.misc new file mode 100644 index 000000000000..3bb51962e79e --- /dev/null +++ b/changelog.d/13119.misc @@ -0,0 +1 @@ +Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room. From 8845dfdeaa43ddade14759a17dde8dffa5bcf0cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 14:28:17 +0100 Subject: [PATCH 3/6] Fix lint --- .../databases/main/event_push_actions.py | 3 +- synapse/storage/databases/main/stream.py | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index d0916e7874b9..9bdfa2d091b9 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -25,7 +25,6 @@ LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.util import json_encoder @@ -219,7 +218,7 @@ def _get_unread_counts_by_receipt_txn( retcol="event_id", ) - stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined] + stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) return self._get_unread_counts_by_pos_txn( txn, room_id, user_id, stream_ordering diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8e88784d3ce3..f5fd6f389956 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -46,10 +46,12 @@ Set, Tuple, cast, + overload, ) import attr from frozendict import frozendict +from typing_extensions import Literal from twisted.internet import defer @@ -795,6 +797,32 @@ async def get_current_room_stream_token_for_room_id( ) return RoomStreamToken(topo, stream_ordering) + @overload + def get_stream_id_for_event_txn( + self, + txn: LoggingTransaction, + event_id: str, + allow_none: Literal[True], + ) -> int: + ... + + @overload + def get_stream_id_for_event_txn( + self, + txn: LoggingTransaction, + event_id: str, + ) -> int: + ... + + @overload + def get_stream_id_for_event_txn( + self, + txn: LoggingTransaction, + event_id: str, + allow_none: bool = False, + ) -> Optional[int]: + ... + def get_stream_id_for_event_txn( self, txn: LoggingTransaction, From 818463fa1c3e115bdfae8b63456888fc15eefd2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 15:40:36 +0100 Subject: [PATCH 4/6] Fix port DB script --- synapse/_scripts/synapse_port_db.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 9c06c837dc1d..f3f9c6d54c7b 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -270,6 +270,9 @@ def get_reactor(self) -> ISynapseReactor: def get_instance_name(self) -> str: return "master" + def should_send_federation(self) -> bool: + return False + class Porter: def __init__( From 5e4f5dbd95521e7e7ebc85cbdeb82cd54096d658 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Jun 2022 11:01:00 +0100 Subject: [PATCH 5/6] Update synapse/storage/databases/main/event_push_actions.py Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 9bdfa2d091b9..747c6560ec70 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -304,7 +304,7 @@ def _get_notif_unread_count_for_user_room( # If the max stream ordering is less than the min stream ordering, # then obviously there are zero push actions in that range. - if max_stream_ordering < stream_ordering: + if max_stream_ordering <= stream_ordering: return 0, 0 sql = f""" From cf10193a805607f5ef51466a1c6ba9e700b9cb42 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Jun 2022 11:03:01 +0100 Subject: [PATCH 6/6] Fix typing --- synapse/storage/databases/main/stream.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index f5fd6f389956..3a1df7776c90 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -802,15 +802,7 @@ def get_stream_id_for_event_txn( self, txn: LoggingTransaction, event_id: str, - allow_none: Literal[True], - ) -> int: - ... - - @overload - def get_stream_id_for_event_txn( - self, - txn: LoggingTransaction, - event_id: str, + allow_none: Literal[False] = False, ) -> int: ...