From b227cd386aa8845ba77244b2b9f8b75d54d676e2 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Oct 2022 13:45:17 +0100 Subject: [PATCH 01/21] Add `get_unread_counts_by_room_for_user` method --- .../databases/main/event_push_actions.py | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b283ab0f9c50..f00672223eca 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -74,6 +74,7 @@ """ import logging +from collections import defaultdict from typing import ( TYPE_CHECKING, Collection, @@ -463,6 +464,141 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: return result + async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: + """Get the notification count by room for a user. Only considers notifications, + no highlights or unreads, and threads are currently aggregated under their room. + + This function is intentionally not cached because it is called to calculate the + unread badge for notifications and thus the result is expected to change. + + Note that this function assumes the user is a member of the room. Because + summary rows are not removed when a user leaves a room, the caller must + filter out those results from the result. + """ + return await self.db_pool.runInteraction( + "get_unread_counts_by_room_for_user", + self._get_unread_counts_by_room_for_user_txn, + user_id, + ) + + def _get_unread_counts_by_room_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> Dict[str, int]: + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + args.extend([user_id, user_id]) + + room_to_count: Dict[str, int] = defaultdict(int) + + # First get summary counts by room / thread for the user. Note we use a OR join + # condition here such that we handle both receipts with thread ID's and those + # without that get applied to any thread_id values. + sql = f""" + SELECT eps.room_id, notif_count + FROM event_push_summary AS eps + LEFT JOIN ( + SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + ) AS receipts ON ( + eps.room_id = receipts.room_id + AND eps.thread_id = receipts.thread_id + ) OR ( + eps.room_id = receipts.room_id + AND receipts.thread_id IS NULL + ) + WHERE user_id = ? + AND notif_count != 0 + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > receipt_stream_ordering) + OR last_receipt_stream_ordering = receipt_stream_ordering + OR receipt_stream_ordering IS NULL + ) + """ + txn.execute(sql, args) + + for room_id, notif_count in txn: + room_to_count[room_id] += notif_count + + # Now get any event push actions that haven't been rotated using the same OR + # join and filter by receipt and event push summary rotated up to stream ordering. + sql = f""" + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + LEFT JOIN ( + SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + ) AS receipts ON ( + epa.room_id = receipts.room_id + AND epa.thread_id = receipts.thread_id + ) OR ( + epa.room_id = receipts.room_id + AND receipts.thread_id IS NULL + ) + WHERE user_id = ? + AND epa.notif = 1 + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) + AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering) + GROUP BY (epa.room_id) + """ + txn.execute(sql, args) + + for room_id, notif_count in txn: + room_to_count[room_id] += notif_count + + room_id_clause, room_id_args = make_in_list_sql_clause( + self.database_engine, "epa.room_id", room_to_count.keys() + ) + + # Finally re-check event_push_actions for any rooms not in the summary, ignoring + # the rotated up-to position. This handles the case where a read receipt has arrived + # but not been rotated meaning the summary table is out of date, so we go back to + # the push actions table. + sql = f""" + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + LEFT JOIN ( + SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + ) AS receipts ON ( + epa.room_id = receipts.room_id + AND epa.thread_id = receipts.thread_id + ) OR ( + epa.room_id = receipts.room_id + AND receipts.thread_id IS NULL + ) + WHERE user_id = ? + AND NOT {room_id_clause} + AND epa.notif = 1 + AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering) + GROUP BY (epa.room_id) + """ + + args.extend(room_id_args) + txn.execute(sql, args) + + for room_id, notif_count in txn: + room_to_count[room_id] += notif_count + + return room_to_count + @cached(tree=True, max_entries=5000, iterable=True) async def get_unread_event_push_actions_by_room_for_user( self, From 9f9ac55528ae4719716561518ac2b19bf069875f Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Oct 2022 13:45:23 +0100 Subject: [PATCH 02/21] Test against new method --- tests/storage/test_event_push_actions.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index ee48920f84a9..53cfe155708c 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -175,6 +175,15 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None: ) self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(sum(aggregate_counts.values()), noitf_count) + def _create_event(highlight: bool = False) -> str: result = self.helper.send_event( room_id, From 09199419dc8e7ded93c4da8d248ec78210604ce1 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Oct 2022 13:45:56 +0100 Subject: [PATCH 03/21] Use `get_unread_counts_by_room_for_user` when calculating badge counts --- synapse/push/push_tools.py | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index edeba27a4553..07552366c4c6 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,7 +17,6 @@ from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore -from synapse.util.async_helpers import concurrently_execute async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int: @@ -26,23 +25,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - badge = len(invites) - room_notifs = [] - - async def get_room_unread_count(room_id: str) -> None: - room_notifs.append( - await store.get_unread_event_push_actions_by_room_for_user( - room_id, - user_id, - ) - ) - - await concurrently_execute(get_room_unread_count, joins, 10) - - for notifs in room_notifs: - # Combine the counts from all the threads. - notify_count = notifs.main_timeline.notify_count + sum( - n.notify_count for n in notifs.threads.values() - ) + room_to_count = await store.get_unread_counts_by_room_for_user(user_id) + for room_id, notify_count in room_to_count.items(): + if room_id not in joins: + continue if notify_count == 0: continue @@ -51,8 +37,10 @@ async def get_room_unread_count(room_id: str) -> None: # return one badge count per conversation badge += 1 else: - # increment the badge count by the number of unread messages in the room + # Increase badge by number of notifications in room + # NOTE: this includes threaded notifications as well as non-threaded badge += notify_count + return badge From 9ce71cb0b0d3e707f5fe0bdd0d648d3ea1377e8c Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 21 Oct 2022 13:50:33 +0100 Subject: [PATCH 04/21] Add changelog file --- changelog.d/14255.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14255.misc diff --git a/changelog.d/14255.misc b/changelog.d/14255.misc new file mode 100644 index 000000000000..39924659c73c --- /dev/null +++ b/changelog.d/14255.misc @@ -0,0 +1 @@ +Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar). From 0d9dda6e18d74984e032a2223425beb31ec76508 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 21 Oct 2022 15:45:12 +0100 Subject: [PATCH 05/21] Include return value in docstring Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index f00672223eca..1a04ba8da635 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -474,6 +474,9 @@ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, in Note that this function assumes the user is a member of the room. Because summary rows are not removed when a user leaves a room, the caller must filter out those results from the result. + + Returns: + A map of room ID to notification counts for the given user. """ return await self.db_pool.runInteraction( "get_unread_counts_by_room_for_user", From bd7c69470e6bc4847641ce7932687722f16ba9ec Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 21:38:33 +0100 Subject: [PATCH 06/21] Test aggregate notif counting with threads/mixed receipts --- tests/storage/test_event_push_actions.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 53cfe155708c..2778edae4c72 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -327,6 +327,15 @@ def _assert_counts( else: self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(sum(aggregate_counts.values()), noitf_count + thread_notif_count) + def _create_event( highlight: bool = False, thread_id: Optional[str] = None ) -> str: @@ -498,6 +507,15 @@ def _assert_counts( else: self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(sum(aggregate_counts.values()), noitf_count + thread_notif_count) + def _create_event( highlight: bool = False, thread_id: Optional[str] = None ) -> str: From ffaba6058057260d572ef4a0d7ef7d062e87c585 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 21:39:11 +0100 Subject: [PATCH 07/21] Rewrite aggregate count query using CTE & two joins This now correctly handles mixed threaded/unthreaded receipts. --- .../databases/main/event_push_actions.py | 120 +++++++++++------- 1 file changed, 75 insertions(+), 45 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1a04ba8da635..68935035b8f3 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -496,73 +496,96 @@ def _get_unread_counts_by_room_for_user_txn( room_to_count: Dict[str, int] = defaultdict(int) - # First get summary counts by room / thread for the user. Note we use a OR join - # condition here such that we handle both receipts with thread ID's and those - # without that get applied to any thread_id values. + # First get summary counts by room / thread for the user. We join the max receipt + # stream ordering both threaded & unthreaded and use the max to compare against + # the summary table. + max_clause = """MAX( + COALESCE(threaded_receipt_stream_ordering, 0), + COALESCE(unthreaded_receipt_stream_ordering, 0) + )""" + sql = f""" - SELECT eps.room_id, notif_count - FROM event_push_summary AS eps - LEFT JOIN ( - SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + WITH all_receipts AS ( + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id, thread_id - ) AS receipts ON ( - eps.room_id = receipts.room_id - AND eps.thread_id = receipts.thread_id - ) OR ( - eps.room_id = receipts.room_id - AND receipts.thread_id IS NULL ) + SELECT eps.room_id, eps.thread_id, notif_count + FROM event_push_summary AS eps + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS threaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NOT NULL + ) AS threaded_receipts USING (room_id, thread_id) + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NULL + ) AS unthreaded_receipts USING (room_id) WHERE user_id = ? AND notif_count != 0 AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > receipt_stream_ordering) - OR last_receipt_stream_ordering = receipt_stream_ordering - OR receipt_stream_ordering IS NULL + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) + OR last_receipt_stream_ordering = {max_clause} ) """ txn.execute(sql, args) - for room_id, notif_count in txn: + seen_thread_ids = set() + + for room_id, thread_id, notif_count in txn: room_to_count[room_id] += notif_count + seen_thread_ids.add(thread_id) # Now get any event push actions that haven't been rotated using the same OR # join and filter by receipt and event push summary rotated up to stream ordering. sql = f""" - SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count - FROM event_push_actions AS epa - LEFT JOIN ( - SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + WITH all_receipts AS ( + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id, thread_id - ) AS receipts ON ( - epa.room_id = receipts.room_id - AND epa.thread_id = receipts.thread_id - ) OR ( - epa.room_id = receipts.room_id - AND receipts.thread_id IS NULL ) + SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS threaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NOT NULL + ) AS threaded_receipts USING (room_id, thread_id) + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NULL + ) AS unthreaded_receipts USING (room_id) WHERE user_id = ? AND epa.notif = 1 AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) - AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering) - GROUP BY (epa.room_id) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + GROUP BY (epa.thread_id) """ txn.execute(sql, args) - for room_id, notif_count in txn: + for room_id, thread_id, notif_count in txn: + # Note: only count push actions we have valid summaries for with up to date receipt. + if thread_id not in seen_thread_ids: + continue room_to_count[room_id] += notif_count - room_id_clause, room_id_args = make_in_list_sql_clause( - self.database_engine, "epa.room_id", room_to_count.keys() + thread_id_clause, thread_ids_args = make_in_list_sql_clause( + self.database_engine, "epa.thread_id", seen_thread_ids ) # Finally re-check event_push_actions for any rooms not in the summary, ignoring @@ -570,31 +593,38 @@ def _get_unread_counts_by_room_for_user_txn( # but not been rotated meaning the summary table is out of date, so we go back to # the push actions table. sql = f""" - SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count - FROM event_push_actions AS epa - LEFT JOIN ( - SELECT room_id, thread_id, MAX(stream_ordering) AS receipt_stream_ordering + WITH all_receipts AS ( + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id, thread_id - ) AS receipts ON ( - epa.room_id = receipts.room_id - AND epa.thread_id = receipts.thread_id - ) OR ( - epa.room_id = receipts.room_id - AND receipts.thread_id IS NULL ) + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS threaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NOT NULL + ) AS threaded_receipts USING (room_id, thread_id) + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NULL + ) AS unthreaded_receipts USING (room_id) WHERE user_id = ? - AND NOT {room_id_clause} + AND NOT {thread_id_clause} AND epa.notif = 1 - AND (receipt_stream_ordering IS NULL OR stream_ordering > receipt_stream_ordering) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) GROUP BY (epa.room_id) """ - args.extend(room_id_args) + args.extend(thread_ids_args) txn.execute(sql, args) for room_id, notif_count in txn: From e343f6d467c5c9497f056cc2a3817fbd3ec6f1b0 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 21:40:31 +0100 Subject: [PATCH 08/21] De-duplicate common SQL into variables Makes the individual queries harder to read but also ensures they are matching and better displays the similarities in queries required to do notification counting. --- .../databases/main/event_push_actions.py | 76 ++++++------------- 1 file changed, 23 insertions(+), 53 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 68935035b8f3..260068774e25 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -496,15 +496,7 @@ def _get_unread_counts_by_room_for_user_txn( room_to_count: Dict[str, int] = defaultdict(int) - # First get summary counts by room / thread for the user. We join the max receipt - # stream ordering both threaded & unthreaded and use the max to compare against - # the summary table. - max_clause = """MAX( - COALESCE(threaded_receipt_stream_ordering, 0), - COALESCE(unthreaded_receipt_stream_ordering, 0) - )""" - - sql = f""" + receipts_cte = f""" WITH all_receipts AS ( SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering FROM receipts_linearized @@ -514,8 +506,9 @@ def _get_unread_counts_by_room_for_user_txn( AND user_id = ? GROUP BY room_id, thread_id ) - SELECT eps.room_id, eps.thread_id, notif_count - FROM event_push_summary AS eps + """ + + receipts_joins = """ LEFT JOIN ( SELECT room_id, thread_id, max_receipt_stream_ordering AS threaded_receipt_stream_ordering @@ -528,6 +521,21 @@ def _get_unread_counts_by_room_for_user_txn( FROM all_receipts WHERE thread_id IS NULL ) AS unthreaded_receipts USING (room_id) + """ + + # First get summary counts by room / thread for the user. We join the max receipt + # stream ordering both threaded & unthreaded and use the max to compare against + # the summary table. + max_clause = """MAX( + COALESCE(threaded_receipt_stream_ordering, 0), + COALESCE(unthreaded_receipt_stream_ordering, 0) + )""" + + sql = f""" + {receipts_cte} + SELECT eps.room_id, eps.thread_id, notif_count + FROM event_push_summary AS eps + {receipts_joins} WHERE user_id = ? AND notif_count != 0 AND ( @@ -546,29 +554,10 @@ def _get_unread_counts_by_room_for_user_txn( # Now get any event push actions that haven't been rotated using the same OR # join and filter by receipt and event push summary rotated up to stream ordering. sql = f""" - WITH all_receipts AS ( - SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering - FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) - WHERE - {receipt_types_clause} - AND user_id = ? - GROUP BY room_id, thread_id - ) + {receipts_cte} SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count FROM event_push_actions AS epa - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS threaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NOT NULL - ) AS threaded_receipts USING (room_id, thread_id) - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NULL - ) AS unthreaded_receipts USING (room_id) + {receipts_joins} WHERE user_id = ? AND epa.notif = 1 AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) @@ -593,29 +582,10 @@ def _get_unread_counts_by_room_for_user_txn( # but not been rotated meaning the summary table is out of date, so we go back to # the push actions table. sql = f""" - WITH all_receipts AS ( - SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering - FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) - WHERE - {receipt_types_clause} - AND user_id = ? - GROUP BY room_id, thread_id - ) + {receipts_cte} SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count FROM event_push_actions AS epa - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS threaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NOT NULL - ) AS threaded_receipts USING (room_id, thread_id) - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NULL - ) AS unthreaded_receipts USING (room_id) + {receipts_joins} WHERE user_id = ? AND NOT {thread_id_clause} AND epa.notif = 1 From 3cfe596c7b180f1ed9eccca4403df63d477354e4 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 21:49:55 +0100 Subject: [PATCH 09/21] Formatting --- tests/storage/test_event_push_actions.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 2778edae4c72..13b43d064ba8 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -334,7 +334,9 @@ def _assert_counts( user_id, ) ) - self.assertEqual(sum(aggregate_counts.values()), noitf_count + thread_notif_count) + self.assertEqual( + sum(aggregate_counts.values()), noitf_count + thread_notif_count + ) def _create_event( highlight: bool = False, thread_id: Optional[str] = None @@ -514,7 +516,9 @@ def _assert_counts( user_id, ) ) - self.assertEqual(sum(aggregate_counts.values()), noitf_count + thread_notif_count) + self.assertEqual( + sum(aggregate_counts.values()), noitf_count + thread_notif_count + ) def _create_event( highlight: bool = False, thread_id: Optional[str] = None From 72e909effff49d7e08fa161ebf19119a154ffa08 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 21:56:25 +0100 Subject: [PATCH 10/21] Use `GREATEST` instead of a `MAX` when using postgres --- .../databases/main/event_push_actions.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 260068774e25..c6e6da763866 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -96,6 +96,7 @@ DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -526,10 +527,19 @@ def _get_unread_counts_by_room_for_user_txn( # First get summary counts by room / thread for the user. We join the max receipt # stream ordering both threaded & unthreaded and use the max to compare against # the summary table. - max_clause = """MAX( - COALESCE(threaded_receipt_stream_ordering, 0), - COALESCE(unthreaded_receipt_stream_ordering, 0) - )""" + # PostgreSQL and SQLite differ in comparing scalar numerics. + if isinstance(self.database_engine, PostgresEngine): + # GREATEST ignores NULLs. + max_clause = """GREATEST( + threaded_receipt_stream_ordering, + unthreaded_receipt_stream_ordering + )""" + else: + # MAX returns NULL if any are NULL, so COALESCE to 0 first. + max_clause = """MAX( + COALESCE(threaded_receipt_stream_ordering, 0), + COALESCE(unthreaded_receipt_stream_ordering, 0) + )""" sql = f""" {receipts_cte} From e79bc729b1ce9d5fad3c4109eac7dbeba03a9b2b Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 22 Oct 2022 23:16:19 +0100 Subject: [PATCH 11/21] Fix group by clauses --- synapse/storage/databases/main/event_push_actions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index c6e6da763866..4cd18837e003 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -573,7 +573,7 @@ def _get_unread_counts_by_room_for_user_txn( AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) - GROUP BY (epa.thread_id) + GROUP BY epa.room_id, epa.thread_id """ txn.execute(sql, args) @@ -601,7 +601,7 @@ def _get_unread_counts_by_room_for_user_txn( AND epa.notif = 1 AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) - GROUP BY (epa.room_id) + GROUP BY epa.room_id """ args.extend(thread_ids_args) From 54dae60ecdb751aa1819ab0effa6f24ab9b460e2 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 27 Nov 2022 16:50:31 +0000 Subject: [PATCH 12/21] Fix comment to match spec Co-authored-by: Patrick Cloke --- synapse/push/push_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 07552366c4c6..484ea50b8226 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -38,7 +38,7 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - badge += 1 else: # Increase badge by number of notifications in room - # NOTE: this includes threaded notifications as well as non-threaded + # NOTE: this includes threaded and unthreaded notifications. badge += notify_count return badge From e0a1646ade1e8c1f2b398a8800864499690f2a77 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 27 Nov 2022 16:51:03 +0000 Subject: [PATCH 13/21] Fix up docstring Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 4cd18837e003..1e74abe1e360 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -467,7 +467,7 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: """Get the notification count by room for a user. Only considers notifications, - no highlights or unreads, and threads are currently aggregated under their room. + not highlight or unread counts, and threads are currently aggregated under their room. This function is intentionally not cached because it is called to calculate the unread badge for notifications and thus the result is expected to change. From 330dd4b6df6cc9faba9e13ae1f15c4f4c54a6c00 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 27 Nov 2022 16:51:36 +0000 Subject: [PATCH 14/21] Fixup docstring Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1e74abe1e360..ca930280a0d8 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -470,7 +470,7 @@ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, in not highlight or unread counts, and threads are currently aggregated under their room. This function is intentionally not cached because it is called to calculate the - unread badge for notifications and thus the result is expected to change. + unread badge for push notifications and thus the result is expected to change. Note that this function assumes the user is a member of the room. Because summary rows are not removed when a user leaves a room, the caller must From c510d1c0148dccb2840a3df49e111d203b500c18 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 27 Nov 2022 16:55:00 +0000 Subject: [PATCH 15/21] Tidy up comment Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/event_push_actions.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index ca930280a0d8..0d967d370a4a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -524,9 +524,10 @@ def _get_unread_counts_by_room_for_user_txn( ) AS unthreaded_receipts USING (room_id) """ - # First get summary counts by room / thread for the user. We join the max receipt - # stream ordering both threaded & unthreaded and use the max to compare against - # the summary table. + # First get summary counts by room / thread for the user. We use the max receipt + # stream ordering of both threaded & unthreaded receipts to compare against the + # summary table. + # # PostgreSQL and SQLite differ in comparing scalar numerics. if isinstance(self.database_engine, PostgresEngine): # GREATEST ignores NULLs. From 0f8524c1fb580a384f26f1204928b887c2889b36 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 27 Nov 2022 16:56:43 +0000 Subject: [PATCH 16/21] Fix query indents Co-authored-by: Patrick Cloke --- .../databases/main/event_push_actions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 0d967d370a4a..fabb74cd6f67 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -548,11 +548,11 @@ def _get_unread_counts_by_room_for_user_txn( FROM event_push_summary AS eps {receipts_joins} WHERE user_id = ? - AND notif_count != 0 - AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) - OR last_receipt_stream_ordering = {max_clause} - ) + AND notif_count != 0 + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) + OR last_receipt_stream_ordering = {max_clause} + ) """ txn.execute(sql, args) @@ -570,10 +570,10 @@ def _get_unread_counts_by_room_for_user_txn( FROM event_push_actions AS epa {receipts_joins} WHERE user_id = ? - AND epa.notif = 1 - AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) - AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) - AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + AND epa.notif = 1 + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) GROUP BY epa.room_id, epa.thread_id """ txn.execute(sql, args) From dc9e3877fb2d6bd1fa4660927d646f1cd5559af8 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 28 Nov 2022 08:07:55 +0000 Subject: [PATCH 17/21] Fix tab/space indents --- .../databases/main/event_push_actions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index fabb74cd6f67..7c764dc77fc1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -548,11 +548,11 @@ def _get_unread_counts_by_room_for_user_txn( FROM event_push_summary AS eps {receipts_joins} WHERE user_id = ? - AND notif_count != 0 - AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) - OR last_receipt_stream_ordering = {max_clause} - ) + AND notif_count != 0 + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) + OR last_receipt_stream_ordering = {max_clause} + ) """ txn.execute(sql, args) @@ -570,10 +570,10 @@ def _get_unread_counts_by_room_for_user_txn( FROM event_push_actions AS epa {receipts_joins} WHERE user_id = ? - AND epa.notif = 1 - AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) - AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) - AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + AND epa.notif = 1 + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) GROUP BY epa.room_id, epa.thread_id """ txn.execute(sql, args) From de2d9ba772d38513eab4bd0ee0a07b94beea6655 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 28 Nov 2022 08:08:41 +0000 Subject: [PATCH 18/21] Add comment about room counts including stale counts --- synapse/push/push_tools.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 484ea50b8226..7ee07e4beebd 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -27,6 +27,8 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - room_to_count = await store.get_unread_counts_by_room_for_user(user_id) for room_id, notify_count in room_to_count.items(): + # room_to_count may include rooms which the user has left, + # ignore those. if room_id not in joins: continue From 72c7be1ca983be181b141f51756579eb82911e85 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 28 Nov 2022 08:15:25 +0000 Subject: [PATCH 19/21] Move variable definition closer to usage --- synapse/storage/databases/main/event_push_actions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 7c764dc77fc1..7ebe34f773ab 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -495,8 +495,6 @@ def _get_unread_counts_by_room_for_user_txn( ) args.extend([user_id, user_id]) - room_to_count: Dict[str, int] = defaultdict(int) - receipts_cte = f""" WITH all_receipts AS ( SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering @@ -557,6 +555,7 @@ def _get_unread_counts_by_room_for_user_txn( txn.execute(sql, args) seen_thread_ids = set() + room_to_count: Dict[str, int] = defaultdict(int) for room_id, thread_id, notif_count in txn: room_to_count[room_id] += notif_count From 406807965f5905d01ca4b4ddb0ed1edde8609fbf Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 28 Nov 2022 08:15:55 +0000 Subject: [PATCH 20/21] Fix typo in variable --- tests/storage/test_event_push_actions.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 13b43d064ba8..cc6d6c99d058 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -156,7 +156,7 @@ def test_count_aggregation(self) -> None: last_event_id: str - def _assert_counts(noitf_count: int, highlight_count: int) -> None: + def _assert_counts(notif_count: int, highlight_count: int) -> None: counts = self.get_success( self.store.db_pool.runInteraction( "get-unread-counts", @@ -168,7 +168,7 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None: self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), @@ -182,7 +182,7 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None: user_id, ) ) - self.assertEqual(sum(aggregate_counts.values()), noitf_count) + self.assertEqual(sum(aggregate_counts.values()), notif_count) def _create_event(highlight: bool = False) -> str: result = self.helper.send_event( @@ -292,7 +292,7 @@ def test_count_aggregation_threads(self) -> None: last_event_id: str def _assert_counts( - noitf_count: int, + notif_count: int, highlight_count: int, thread_notif_count: int, thread_highlight_count: int, @@ -308,7 +308,7 @@ def _assert_counts( self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), @@ -335,7 +335,7 @@ def _assert_counts( ) ) self.assertEqual( - sum(aggregate_counts.values()), noitf_count + thread_notif_count + sum(aggregate_counts.values()), notif_count + thread_notif_count ) def _create_event( @@ -474,7 +474,7 @@ def test_count_aggregation_mixed(self) -> None: last_event_id: str def _assert_counts( - noitf_count: int, + notif_count: int, highlight_count: int, thread_notif_count: int, thread_highlight_count: int, @@ -490,7 +490,7 @@ def _assert_counts( self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), @@ -517,7 +517,7 @@ def _assert_counts( ) ) self.assertEqual( - sum(aggregate_counts.values()), noitf_count + thread_notif_count + sum(aggregate_counts.values()), notif_count + thread_notif_count ) def _create_event( @@ -677,7 +677,7 @@ def _create_event(type: str, content: JsonDict) -> str: ) return result["event_id"] - def _assert_counts(noitf_count: int, thread_notif_count: int) -> None: + def _assert_counts(notif_count: int, thread_notif_count: int) -> None: counts = self.get_success( self.store.db_pool.runInteraction( "get-unread-counts", @@ -689,7 +689,7 @@ def _assert_counts(noitf_count: int, thread_notif_count: int) -> None: self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, unread_count=0, highlight_count=0 + notify_count=notif_count, unread_count=0, highlight_count=0 ), ) if thread_notif_count: From ed3809f90caf932ba9133007a5329a6fa0d210d8 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 28 Nov 2022 08:16:56 +0000 Subject: [PATCH 21/21] Only check aggregate counts for our target room --- tests/storage/test_event_push_actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index cc6d6c99d058..5fa8bd2d98ce 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -182,7 +182,7 @@ def _assert_counts(notif_count: int, highlight_count: int) -> None: user_id, ) ) - self.assertEqual(sum(aggregate_counts.values()), notif_count) + self.assertEqual(aggregate_counts[room_id], notif_count) def _create_event(highlight: bool = False) -> str: result = self.helper.send_event( @@ -335,7 +335,7 @@ def _assert_counts( ) ) self.assertEqual( - sum(aggregate_counts.values()), notif_count + thread_notif_count + aggregate_counts[room_id], notif_count + thread_notif_count ) def _create_event( @@ -517,7 +517,7 @@ def _assert_counts( ) ) self.assertEqual( - sum(aggregate_counts.values()), notif_count + thread_notif_count + aggregate_counts[room_id], notif_count + thread_notif_count ) def _create_event(