Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Bg update to populate new events table columns
Browse files Browse the repository at this point in the history
These columns were added back in Synapse 1.52, and have been populated for new
events since then. It's now (beyond) time to back-populate them for existing
events.
  • Loading branch information
richvdh committed Jul 7, 2022
1 parent 0c95313 commit c3132e2
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/13215.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
99 changes: 99 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"

EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
Expand Down Expand Up @@ -253,6 +255,11 @@ def __init__(
replaces_index="ev_edges_id",
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
self._background_events_populate_state_key_rejections,
)

async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -1399,3 +1406,95 @@ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
)

return batch_size

async def _background_events_populate_state_key_rejections(
self, progress: JsonDict, batch_size: int
) -> int:
"""Back-populate `events.state_key` and `events.rejection_reason"""

min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]

def _populate_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""

# first we need to find an endpoint.
# we need to find the final row in the batch of batch_size, which means
# we need to skip over (batch_size-1) rows and get the next row.
txn.execute(
"""
SELECT stream_ordering FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 1 OFFSET ?
""",
(
min_stream_ordering_exclusive,
max_stream_ordering_inclusive,
batch_size - 1,
),
)

endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]

where_clause = "e.stream_ordering > ?"
args = [min_stream_ordering_exclusive]
if endpoint:
where_clause += " AND e.stream_ordering <= ?"
args.append(endpoint)

# now do the updates. We consider rows within our range of stream orderings,
# but only those with a non-null rejection reason or state_key (since there
# is nothing to update for rows where rejection reason and state_key are
# both null.
txn.execute(
f"""
WITH t AS (
SELECT e.event_id, r.reason, se.state_key
FROM events e
LEFT JOIN rejections r USING (event_id)
LEFT JOIN state_events se USING (event_id)
WHERE ({where_clause}) AND (
r.reason IS NOT NULL OR se.state_key IS NOT NULL
)
)
UPDATE events
SET rejection_reason=t.reason, state_key=t.state_key
FROM t WHERE events.event_id = t.event_id
""",
args,
)

logger.info(
"populated new `events` columns up to %s/%i: updated %i/%i rows",
endpoint,
max_stream_ordering_inclusive,
txn.rowcount,
batch_size,
)

if endpoint is None:
# we're done
return True

progress["min_stream_ordering_exclusive"] = endpoint
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
progress,
)
return False

done = await self.db_pool.runInteraction(
desc="events_populate_state_key_rejections", func=_populate_txn
)

if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
)

return batch_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2022 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from synapse.storage.types import Cursor


def run_create(cur: Cursor, database_engine, *args, **kwargs):
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""

# we know that any new events will have the columns populated (and that has been
# the case since schema_version 68, so there is no chance of rolling back now).
#
# So, we only need to make sure that existing rows are updated. We read the
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()

if min_stream_ordering is None:
# no rows, nothing to do.
return

cur.execute(
"INSERT into background_updates (ordering, update_name, progress_json)"
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
(
json.dumps(
{
"min_stream_ordering_exclusive": min_stream_ordering - 1,
"max_stream_ordering_inclusive": max_stream_ordering,
}
),
),
)

0 comments on commit c3132e2

Please sign in to comment.