From 52b0ad023a39625bd9ab4270ec43f47cac59daae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 11:59:24 +0100 Subject: [PATCH 01/14] Handle receipts async --- .../databases/main/event_push_actions.py | 135 ++++++++++++------ synapse/storage/databases/main/receipts.py | 13 +- .../delta/72/01event_push_summary_receipt.sql | 32 +++++ tests/storage/test_event_push_actions.py | 33 ++++- 4 files changed, 156 insertions(+), 57 deletions(-) create mode 100644 synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 10a7962382f7..311ca8c90c9b 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -233,14 +233,25 @@ def _get_unread_counts_by_pos_txn( counts = NotifCounts() - # First we pull the counts from the summary table + # First we pull the counts from the summary table. + # + # We check that `last_receipt_stream_ordering` matches the stream + # ordering given, if it doesn't then a new read receipt hasn't been + # handled yet and so we the data in the table is stale. + # + # If `last_receipt_stream_ordering` is null then that means its up to + # date. txn.execute( """ SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering > ? + WHERE room_id = ? AND user_id = ? + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) + OR last_receipt_stream_ordering = ? + ) """, - (room_id, user_id, stream_ordering), + (room_id, user_id, stream_ordering, stream_ordering), ) row = txn.fetchone() @@ -800,6 +811,18 @@ async def _rotate_notifs(self) -> None: self._doing_notif_rotation = True try: + # First we handle any new receipts that have happened. + while True: + logger.info("Handling new receipts") + + caught_up = await self.db_pool.runInteraction( + "_handle_new_receipts_for_notifs_txn", + self._handle_new_receipts_for_notifs_txn, + ) + if caught_up: + break + + # Then we update the event push summaries for any new events while True: logger.info("Rotating notifications") @@ -810,10 +833,78 @@ async def _rotate_notifs(self) -> None: break await self.hs.get_clock().sleep(self._rotate_delay) + # Finally we clear out old event push actions. await self._remove_old_push_actions_that_have_rotated() finally: self._doing_notif_rotation = False + def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: + """Check for new read receipts and delete from event push actions.""" + + limit = 100 + + sql = """ + SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering + FROM receipts_linearized AS r, event_push_summary_last_receipt_stream_id AS eps, events AS e + WHERE r.stream_id > eps.stream_id AND r.event_id = e.event_id + ORDER BY r.stream_id ASC + LIMIT ? + """ + + txn.execute(sql, (limit,)) + rows = txn.fetchall() + + if not rows: + return True + + # For each new read receipt we delete push actions from before it and + # recalculate the summary. + for _, room_id, user_id, stream_ordering in rows: + txn.execute( + """ + DELETE FROM event_push_actions + WHERE room_id = ? + AND user_id = ? + AND stream_ordering <= ? + AND highlight = 0 + """, + (room_id, user_id, stream_ordering), + ) + + old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + notif_count, unread_count = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering + ) + + self.db_pool.simple_upsert_txn( + txn, + table="event_push_summary", + keyvalues={"room_id": room_id, "user_id": user_id}, + values={ + "notif_count": notif_count, + "unread_count": unread_count, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) + + last_stream_id = rows[-1][0] + + self.db_pool.simple_update_one_txn( + txn, + table="event_push_summary_last_receipt_stream_id", + keyvalues={}, + updatevalues={"stream_id": last_stream_id}, + ) + + return len(rows) < limit + def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool: """Archives older notifications into event_push_summary. Returns whether the archiving process has caught up or not. @@ -1055,44 +1146,6 @@ def _remove_old_push_actions_before_txn( (room_id, user_id), ) - # We need to join on the events table to get the received_ts for - # event_push_actions and sqlite won't let us use a join in a delete so - # we can't just delete where received_ts < x. Furthermore we can - # only identify event_push_actions by a tuple of room_id, event_id - # we we can't use a subquery. - # Instead, we look up the stream ordering for the last event in that - # room received before the threshold time and delete event_push_actions - # in the room with a stream_odering before that. - txn.execute( - "DELETE FROM event_push_actions " - " WHERE user_id = ? AND room_id = ? AND " - " stream_ordering <= ?" - " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", - (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), - ) - - old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( - txn, - table="event_push_summary_stream_ordering", - keyvalues={}, - retcol="stream_ordering", - ) - - notif_count, unread_count = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering - ) - - self.db_pool.simple_upsert_txn( - txn, - table="event_push_summary", - keyvalues={"room_id": room_id, "user_id": user_id}, - values={ - "notif_count": notif_count, - "unread_count": unread_count, - "stream_ordering": old_rotate_stream_ordering, - }, - ) - class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index bec6d60577b5..0090c9f22512 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -26,7 +26,7 @@ cast, ) -from synapse.api.constants import EduTypes, ReceiptTypes +from synapse.api.constants import EduTypes from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import ReceiptsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -682,17 +682,6 @@ def _insert_linearized_receipt_txn( lock=False, ) - # When updating a local users read receipt, remove any push actions - # which resulted from the receipt's event and all earlier events. - if ( - self.hs.is_mine_id(user_id) - and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) - and stream_ordering is not None - ): - self._remove_old_push_actions_before_txn( # type: ignore[attr-defined] - txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering - ) - return rx_ts def _graph_to_linear( diff --git a/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql b/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql new file mode 100644 index 000000000000..063019471e6d --- /dev/null +++ b/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql @@ -0,0 +1,32 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column that records the position of the read receipt for the user at +-- the time we summarised the push actions. This is used to check if the counts +-- are up to date after a new read receipt has been sent. Null means that we can +-- skip that check. +ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT; + + +-- Tracks which new receipts we've handled +CREATE TABLE event_push_summary_last_receipt_stream_id ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO event_push_summary_last_receipt_stream_id (stream_id) + SELECT COALESCE(MAX(stream_id), 0) + FROM receipts_linearized; diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 2ac5f6db5e74..e10624767ce5 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -81,11 +81,26 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None: def _inject_actions(stream: int, action: list) -> None: event = Mock() event.room_id = room_id - event.event_id = "$test:example.com" + event.event_id = f"$test{stream}:example.com" event.internal_metadata.stream_ordering = stream event.internal_metadata.is_outlier.return_value = False event.depth = stream + self.get_success( + self.store.db_pool.simple_insert( + table="events", + values={ + "stream_ordering": stream, + "topological_ordering": stream, + "type": "m.room.message", + "room_id": room_id, + "processed": True, + "outlier": False, + "event_id": event.event_id, + }, + ) + ) + self.get_success( self.store.add_push_actions_to_staging( event.event_id, @@ -105,18 +120,28 @@ def _inject_actions(stream: int, action: list) -> None: def _rotate(stream: int) -> None: self.get_success( self.store.db_pool.runInteraction( - "", self.store._rotate_notifs_before_txn, stream + "rotate-receipts", self.store._handle_new_receipts_for_notifs_txn + ) + ) + + self.get_success( + self.store.db_pool.runInteraction( + "rotate-notifs", self.store._rotate_notifs_before_txn, stream ) ) def _mark_read(stream: int, depth: int) -> None: last_read_stream_ordering[0] = stream + self.get_success( self.store.db_pool.runInteraction( "", - self.store._remove_old_push_actions_before_txn, + self.store._insert_linearized_receipt_txn, room_id, + "m.read", user_id, + f"$test{stream}:example.com", + {}, stream, ) ) @@ -150,7 +175,7 @@ def _mark_read(stream: int, depth: int) -> None: _assert_counts(1, 0) - _mark_read(7, 7) + _mark_read(6, 6) _assert_counts(0, 0) _inject_actions(8, HIGHLIGHT) From a7b010bfe236238a8a7db3423bfb351bf0f8032a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 12:00:26 +0100 Subject: [PATCH 02/14] Newfile --- changelog.d/13118.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13118.misc diff --git a/changelog.d/13118.misc b/changelog.d/13118.misc new file mode 100644 index 000000000000..3bb51962e79e --- /dev/null +++ b/changelog.d/13118.misc @@ -0,0 +1 @@ +Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room. From 22bd55104e35449080f7fd5284eb0eda21c50107 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 12:15:24 +0100 Subject: [PATCH 03/14] Only handle receipts for own users --- .../databases/main/event_push_actions.py | 18 ++++++++++++++++-- tests/storage/test_event_push_actions.py | 2 +- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 311ca8c90c9b..3defb443698e 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -846,12 +846,22 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: sql = """ SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering FROM receipts_linearized AS r, event_push_summary_last_receipt_stream_id AS eps, events AS e - WHERE r.stream_id > eps.stream_id AND r.event_id = e.event_id + WHERE r.stream_id > eps.stream_id AND r.event_id = e.event_id AND user_id LIKE ? ORDER BY r.stream_id ASC LIMIT ? """ - txn.execute(sql, (limit,)) + # We only want local users, so we add a dodgy filter to the above query + # and recheck it below. + user_filter = "%:" + self.hs.hostname + + txn.execute( + sql, + ( + user_filter, + limit, + ), + ) rows = txn.fetchall() if not rows: @@ -860,6 +870,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: # For each new read receipt we delete push actions from before it and # recalculate the summary. for _, room_id, user_id, stream_ordering in rows: + # Only handle our own read receipts. + if not self.hs.is_mine_id(user_id): + continue + txn.execute( """ DELETE FROM event_push_actions diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index e10624767ce5..ef069a8110b8 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -55,7 +55,7 @@ def test_get_unread_push_actions_for_user_in_range_for_email(self) -> None: def test_count_aggregation(self) -> None: room_id = "!foo:example.com" - user_id = "@user1235:example.com" + user_id = "@user1235:test" last_read_stream_ordering = [0] From 3fb73c3d5a73ae36475f9199e25b7c0a39550302 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 12:21:14 +0100 Subject: [PATCH 04/14] Handle remote receipts better --- .../databases/main/event_push_actions.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3defb443698e..811de60423a4 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -865,6 +865,16 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: rows = txn.fetchall() if not rows: + # We always update `event_push_summary_last_receipt_stream_id` to + # ensure that we don't rescan the same receipts for remote users. + # + # This requires repeatable read to be safe. + txn.execute( + """ + UPDATE event_push_summary_last_receipt_stream_id + SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) + """ + ) return True # For each new read receipt we delete push actions from before it and @@ -908,13 +918,15 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: }, ) - last_stream_id = rows[-1][0] - - self.db_pool.simple_update_one_txn( - txn, - table="event_push_summary_last_receipt_stream_id", - keyvalues={}, - updatevalues={"stream_id": last_stream_id}, + # We always update `event_push_summary_last_receipt_stream_id` to + # ensure that we don't rescan the same receipts for remote users. + # + # This requires repeatable read to be safe. + txn.execute( + """ + UPDATE event_push_summary_last_receipt_stream_id + SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) + """ ) return len(rows) < limit From e8c71de6281475a8c467c64125ec67e6e66b57e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jun 2022 12:29:26 +0100 Subject: [PATCH 05/14] Speed up query --- .../storage/databases/main/event_push_actions.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 811de60423a4..e44add4b4d3d 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -843,10 +843,18 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: limit = 100 + min_stream_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_last_receipt_stream_id", + keyvalues={}, + retcol="stream_id", + ) + sql = """ SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering - FROM receipts_linearized AS r, event_push_summary_last_receipt_stream_id AS eps, events AS e - WHERE r.stream_id > eps.stream_id AND r.event_id = e.event_id AND user_id LIKE ? + FROM receipts_linearized AS r + INNER JOIN events AS e USING (event_id) + WHERE r.stream_id > ? AND user_id LIKE ? ORDER BY r.stream_id ASC LIMIT ? """ @@ -858,6 +866,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: txn.execute( sql, ( + min_stream_id, user_filter, limit, ), From 0081a41b170abf1ea5691225f0c1ed5f62897ce9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:39:39 +0100 Subject: [PATCH 06/14] Remove unused function --- .../databases/main/event_push_actions.py | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index e44add4b4d3d..b2bf73e6a753 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1159,28 +1159,6 @@ def remove_old_push_actions_that_have_rotated_txn( if done: break - def _remove_old_push_actions_before_txn( - self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int - ) -> None: - """ - Purges old push actions for a user and room before a given - stream_ordering. - - We however keep a months worth of highlighted notifications, so that - users can still get a list of recent highlights. - - Args: - txn: The transaction - room_id: Room ID to delete from - user_id: user ID to delete for - stream_ordering: The lowest stream ordering which will - not be deleted. - """ - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id, user_id), - ) - class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" From f8df221ff52254d40249105bc7a1062a8b766f24 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:40:04 +0100 Subject: [PATCH 07/14] Remove debug logging --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b2bf73e6a753..0439eeec1cdb 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -813,7 +813,7 @@ async def _rotate_notifs(self) -> None: try: # First we handle any new receipts that have happened. while True: - logger.info("Handling new receipts") + logger.debug("Handling new receipts") caught_up = await self.db_pool.runInteraction( "_handle_new_receipts_for_notifs_txn", From 456e9fd6a74bf8c674868dc890f7f1eccae3d1dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:40:27 +0100 Subject: [PATCH 08/14] Remove pointless early return --- .../storage/databases/main/event_push_actions.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 0439eeec1cdb..3aaac86378d9 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -873,19 +873,6 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) rows = txn.fetchall() - if not rows: - # We always update `event_push_summary_last_receipt_stream_id` to - # ensure that we don't rescan the same receipts for remote users. - # - # This requires repeatable read to be safe. - txn.execute( - """ - UPDATE event_push_summary_last_receipt_stream_id - SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) - """ - ) - return True - # For each new read receipt we delete push actions from before it and # recalculate the summary. for _, room_id, user_id, stream_ordering in rows: From 7dea1610e151d6e97a6465449db92ca9b4254c0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:43:52 +0100 Subject: [PATCH 09/14] Add comment about why repeatable read --- synapse/storage/databases/main/event_push_actions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3aaac86378d9..fe37e49f8d37 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -917,7 +917,12 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: # We always update `event_push_summary_last_receipt_stream_id` to # ensure that we don't rescan the same receipts for remote users. # - # This requires repeatable read to be safe. + # This requires repeatable read to be safe, as we need the + # `MAX(stream_id)` to not include any new rows that have been committed + # since the start of the transaction (since those rows won't have been + # returned by the query above). Alternatively we could query the max + # stream ID at the start of the transaction and bound everything by + # that. txn.execute( """ UPDATE event_push_summary_last_receipt_stream_id From 6235916174f07b947e122c3c20b081ab114e2123 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:45:42 +0100 Subject: [PATCH 10/14] Expand comment about handling receipts --- synapse/storage/databases/main/event_push_actions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index fe37e49f8d37..f1e4ba398a22 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -811,7 +811,8 @@ async def _rotate_notifs(self) -> None: self._doing_notif_rotation = True try: - # First we handle any new receipts that have happened. + # First we recalculate push summaries and delete stale push actions + # for rooms/users with new receipts. while True: logger.debug("Handling new receipts") From 12efcd74d221d30d6a39ea80160ca112ef8c782b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:46:10 +0100 Subject: [PATCH 11/14] Update synapse/storage/databases/main/event_push_actions.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/event_push_actions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index f1e4ba398a22..58d011395a6b 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -840,7 +840,12 @@ async def _rotate_notifs(self) -> None: self._doing_notif_rotation = False def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: - """Check for new read receipts and delete from event push actions.""" + """Check for new read receipts and delete from event push actions. + + Any push actions which predate the user's most recent read receipt are + now redundant, so we can remove them from `event_push_actions` and + update `event_push_summary`. + """ limit = 100 From 681c0708d96ebfc01d610ede949906937585b7a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:54:07 +0100 Subject: [PATCH 12/14] Add comment about event_push_summary when calculating counts --- synapse/storage/databases/main/event_push_actions.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 58d011395a6b..ae2e52716285 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -236,8 +236,10 @@ def _get_unread_counts_by_pos_txn( # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream - # ordering given, if it doesn't then a new read receipt hasn't been - # handled yet and so we the data in the table is stale. + # ordering given, if it doesn't then a new read receipt has arrived and + # we haven't yet updated the counts in `event_push_summary` to reflect + # that. In the latter case we simply ignore `event_push_summary` counts + # and do a manual count of rows in `event_push_actions` table below. # # If `last_receipt_stream_ordering` is null then that means its up to # date. @@ -274,9 +276,9 @@ def _get_unread_counts_by_pos_txn( if row: counts.highlight_count += row[0] - # Finally we need to count push actions that haven't been summarized - # yet. - # We only want to pull out push actions that we haven't summarized yet. + # Finally we need to count push actions that aren't included in the + # summary returned above, e.g. recent events that haven't been + # summarized yet, or the summary is empty due to a recent read receipt. stream_ordering = max(stream_ordering, summary_stream_ordering) notify_count, unread_count = self._get_notif_unread_count_for_user_room( txn, room_id, user_id, stream_ordering From 840c253f959a19c903ac950a05954a286b50a071 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 11:56:48 +0100 Subject: [PATCH 13/14] Explain why null means 'skip check' --- synapse/storage/databases/main/event_push_actions.py | 4 +++- .../schema/main/delta/72/01event_push_summary_receipt.sql | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index ae2e52716285..8aa67fc8cd7f 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -242,7 +242,9 @@ def _get_unread_counts_by_pos_txn( # and do a manual count of rows in `event_push_actions` table below. # # If `last_receipt_stream_ordering` is null then that means its up to - # date. + # date (as the row was written by an older version of Synapse that + # updated `event_push_summary` synchronously when persisting a new read + # receipt). txn.execute( """ SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) diff --git a/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql b/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql index 063019471e6d..e45db615297f 100644 --- a/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql +++ b/synapse/storage/schema/main/delta/72/01event_push_summary_receipt.sql @@ -15,8 +15,11 @@ -- Add a column that records the position of the read receipt for the user at -- the time we summarised the push actions. This is used to check if the counts --- are up to date after a new read receipt has been sent. Null means that we can --- skip that check. +-- are up to date after a new read receipt has been sent. +-- +-- Null means that we can skip that check, as the row was written by an older +-- version of Synapse that updated `event_push_summary` synchronously when +-- persisting a new read receipt ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT; From 362719750f9e2f6e4793e177ba09f983524f783a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jun 2022 12:19:14 +0100 Subject: [PATCH 14/14] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/event_push_actions.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 8aa67fc8cd7f..80ca2fd0b612 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -236,12 +236,13 @@ def _get_unread_counts_by_pos_txn( # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream - # ordering given, if it doesn't then a new read receipt has arrived and + # ordering given. If it doesn't match then a new read receipt has arrived and # we haven't yet updated the counts in `event_push_summary` to reflect - # that. In the latter case we simply ignore `event_push_summary` counts - # and do a manual count of rows in `event_push_actions` table below. + # that; in that case we simply ignore `event_push_summary` counts + # and do a manual count of all of the rows in the `event_push_actions` table + # for this user/room. # - # If `last_receipt_stream_ordering` is null then that means its up to + # If `last_receipt_stream_ordering` is null then that means it's up to # date (as the row was written by an older version of Synapse that # updated `event_push_summary` synchronously when persisting a new read # receipt).