Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Update the thread_id right before use (in case the bg update hasn't f…
Browse files Browse the repository at this point in the history
…inished) (#14222)

This avoids running a forced-update of a null thread_id rows.

An index is added (in the background) to hopefully make this
easier in the future.
  • Loading branch information
clokep authored Oct 18, 2022
1 parent e440f96 commit dbf18f5
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 134 deletions.
1 change: 1 addition & 0 deletions changelog.d/14222.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support for thread-specific notifications & receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771) and [MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)).
103 changes: 103 additions & 0 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,44 @@ def __init__(
self._background_backfill_thread_id,
)

# Indexes which will be used to quickly make the thread_id column non-null.
self.db_pool.updates.register_background_index_update(
"event_push_actions_thread_id_null",
index_name="event_push_actions_thread_id_null",
table="event_push_actions",
columns=["thread_id"],
where_clause="thread_id IS NULL",
)
self.db_pool.updates.register_background_index_update(
"event_push_summary_thread_id_null",
index_name="event_push_summary_thread_id_null",
table="event_push_summary",
columns=["thread_id"],
where_clause="thread_id IS NULL",
)

# Check ASAP (and then later, every 1s) to see if we have finished
# background updates the event_push_actions and event_push_summary tables.
self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
self._event_push_backfill_thread_id_done = False

@wrap_as_background_process("check_event_push_backfill_thread_id")
async def _check_event_push_backfill_thread_id(self) -> None:
"""
Has thread_id finished backfilling?
If not, we need to just-in-time update it so the queries work.
"""
done = await self.db_pool.updates.has_completed_background_update(
"event_push_backfill_thread_id"
)

if done:
self._event_push_backfill_thread_id_done = True
else:
# Reschedule to run.
self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)

async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -526,6 +564,25 @@ def _get_thread(thread_id: str) -> NotifCounts:
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)

# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)

# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
Expand Down Expand Up @@ -1341,6 +1398,25 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
(room_id, user_id, stream_ordering, *thread_args),
)

# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)

# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
Expand Down Expand Up @@ -1475,6 +1551,19 @@ def _rotate_notifs_before_txn(
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
"""

# Ensure that any new actions have an updated thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
""",
(MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# XXX Do we need to update summaries here too?

# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id, thread_id,
Expand Down Expand Up @@ -1537,6 +1626,20 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# Ensure that any updated threads have the proper thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute_batch(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
[
(MAIN_TIMELINE, room_id, user_id)
for user_id, room_id, _ in summaries
],
)

self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
* limitations under the License.
*/

-- The columns can now be made non-nullable.
ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL;
ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL;
ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL;
-- Allow there to be multiple summaries per user/room.
DROP INDEX IF EXISTS event_push_summary_unique_index;

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7306, 'event_push_actions_thread_id_null', '{}', 'event_push_backfill_thread_id');

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7306, 'event_push_summary_thread_id_null', '{}', 'event_push_backfill_thread_id');

This file was deleted.

0 comments on commit dbf18f5

Please sign in to comment.