From da4d9c3161f67d62a29bc718fd1f21bc14dc07d2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 22 Sep 2022 15:26:46 -0400 Subject: [PATCH] Handle threads when fetching events for push. --- .../databases/main/event_push_actions.py | 69 ++++++++++++++----- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 622bb9f4e1b7..8cb162de46a2 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -119,6 +119,32 @@ ] +@attr.s(slots=True, auto_attribs=True) +class _RoomReceipt: + """ + HttpPushAction instances include the information used to generate HTTP + requests to a push gateway. + """ + + unthreaded_stream_ordering: int = 0 + # threaded_stream_ordering includes the main pseudo-thread. + threaded_stream_ordering: Dict[str, int] = attr.Factory(dict) + + def is_unread(self, thread_id: str, stream_ordering: int) -> bool: + """Returns True if the stream ordering is unread according to the receipt information.""" + + # Only include push actions with a stream ordering after both the unthreaded + # and threaded receipt. Properly handles a user without any receipts present. + return ( + self.unthreaded_stream_ordering < stream_ordering + and self.threaded_stream_ordering.get(thread_id, 0) < stream_ordering + ) + + +# A _RoomReceipt with no receipts in it. +MISSING_ROOM_RECEIPT = _RoomReceipt() + + @attr.s(slots=True, frozen=True, auto_attribs=True) class HttpPushAction: """ @@ -589,7 +615,7 @@ def f(txn: LoggingTransaction) -> List[str]: def _get_receipts_by_room_txn( self, txn: LoggingTransaction, user_id: str - ) -> Dict[str, int]: + ) -> Dict[str, _RoomReceipt]: """ Generate a map of room ID to the latest stream ordering that has been read by the given user. @@ -599,7 +625,8 @@ def _get_receipts_by_room_txn( user_id: The user to fetch receipts for. Returns: - A map of room ID to stream ordering for all rooms the user has a receipt in. + A map including all rooms the user is in with a receipt. It maps + room IDs to _RoomReceipt instances """ receipt_types_clause, args = make_in_list_sql_clause( self.database_engine, @@ -611,17 +638,26 @@ def _get_receipts_by_room_txn( ) sql = f""" - SELECT room_id, MAX(stream_ordering) + SELECT room_id, thread_id, MAX(stream_ordering) FROM receipts_linearized INNER JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id + GROUP BY room_id, thread_id """ args.extend((user_id,)) txn.execute(sql, args) - return dict(cast(List[Tuple[str, int]], txn.fetchall())) + + result = {} + for room_id, thread_id, stream_ordering in txn: + room_receipt = result.setdefault(room_id, _RoomReceipt()) + if thread_id is None: + room_receipt.unthreaded_stream_ordering = stream_ordering + else: + room_receipt.threaded_stream_ordering[thread_id] = stream_ordering + + return result async def get_unread_push_actions_for_user_in_range_for_http( self, @@ -656,7 +692,8 @@ def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool]]: sql = """ - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight + SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering, + ep.actions, ep.highlight FROM event_push_actions AS ep WHERE ep.user_id = ? @@ -679,10 +716,10 @@ def get_push_actions_txn( stream_ordering=stream_ordering, actions=_deserialize_action(actions, highlight), ) - for event_id, room_id, stream_ordering, actions, highlight in push_actions - # Only include push actions with a stream ordering after any receipt, or without any - # receipt present (invited to but never read rooms). - if stream_ordering > receipts_by_room.get(room_id, 0) + for event_id, room_id, thread_id, stream_ordering, actions, highlight in push_actions + if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread( + thread_id, stream_ordering + ) ] # Now sort it so it's ordered correctly, since currently it will @@ -728,8 +765,8 @@ def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool, int]]: sql = """ - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight, e.received_ts + SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering, + ep.actions, ep.highlight, e.received_ts FROM event_push_actions AS ep INNER JOIN events AS e USING (room_id, event_id) WHERE @@ -755,10 +792,10 @@ def get_push_actions_txn( actions=_deserialize_action(actions, highlight), received_ts=received_ts, ) - for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions - # Only include push actions with a stream ordering after any receipt, or without any - # receipt present (invited to but never read rooms). - if stream_ordering > receipts_by_room.get(room_id, 0) + for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions + if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread( + thread_id, stream_ordering + ) ] # Now sort it so it's ordered correctly, since currently it will