diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 9c6deb56b95e..3494a40d896c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -773,9 +773,11 @@ async def add_push_actions_to_staging( # This is a helper function for generating the necessary tuple that # can be used to insert into the `event_push_actions_staging` table. + # + # TODO(threads): Return the event's thread_id instead of hard-coding main. def _gen_entry( user_id: str, actions: Collection[Union[Mapping, str]] - ) -> Tuple[str, str, str, int, int, int]: + ) -> Tuple[str, str, str, int, int, int, str]: is_highlight = 1 if _action_has_highlight(actions) else 0 notif = 1 if "notify" in actions else 0 return ( @@ -785,11 +787,20 @@ def _gen_entry( notif, # notif column is_highlight, # highlight column int(count_as_unread), # unread column + "main", # thread_id column ) await self.db_pool.simple_insert_many( "event_push_actions_staging", - keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"), + keys=( + "event_id", + "user_id", + "actions", + "notif", + "highlight", + "unread", + "thread_id", + ), values=[ _gen_entry(user_id, actions) for user_id, actions in user_id_actions.items() @@ -1070,6 +1081,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) # Replace the previous summary with the new counts. + # + # TODO(threads): Upsert per-thread instead of setting them all to main. self.db_pool.simple_upsert_txn( txn, table="event_push_summary", @@ -1079,6 +1092,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: "unread_count": unread_count, "stream_ordering": old_rotate_stream_ordering, "last_receipt_stream_ordering": stream_ordering, + "thread_id": "main", }, ) @@ -1227,17 +1241,19 @@ def _rotate_notifs_before_txn( logger.info("Rotating notifications, handling %d rows", len(summaries)) + # TODO(threads): Update on a per-thread basis. self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", key_names=("user_id", "room_id"), key_values=[(user_id, room_id) for user_id, room_id in summaries], - value_names=("notif_count", "unread_count", "stream_ordering"), + value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"), value_values=[ ( summary.notif_count, summary.unread_count, summary.stream_ordering, + "main", ) for summary in summaries.values() ], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..c0b4080e4b3d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn( sql = """ INSERT INTO event_push_actions ( room_id, event_id, user_id, actions, stream_ordering, - topological_ordering, notif, highlight, unread + topological_ordering, notif, highlight, unread, thread_id ) - SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread + SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id FROM event_push_actions_staging WHERE event_id = ? """ diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 03e04c215d5c..22900ddf76e9 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -694,6 +694,7 @@ def _insert_linearized_receipt_txn( "stream_id": stream_id, "event_id": event_id, "data": json_encoder.encode(data), + "thread_id": None, }, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock @@ -848,6 +849,7 @@ def _insert_graph_receipt_txn( "user_id": user_id, "event_ids": json_encoder.encode(event_ids), "data": json_encoder.encode(data), + "thread_id": None, }, )