Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Bg update to populate new events table columns (#13215)
Browse files Browse the repository at this point in the history
These columns were added back in Synapse 1.52, and have been populated for new
events since then. It's now (beyond) time to back-populate them for existing
events.
  • Loading branch information
richvdh authored Jul 15, 2022
1 parent 7be954f commit b116d3c
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/13215.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
87 changes: 87 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"

EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
Expand Down Expand Up @@ -253,6 +255,11 @@ def __init__(
replaces_index="ev_edges_id",
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
self._background_events_populate_state_key_rejections,
)

async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -1399,3 +1406,83 @@ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
)

return batch_size

async def _background_events_populate_state_key_rejections(
self, progress: JsonDict, batch_size: int
) -> int:
"""Back-populate `events.state_key` and `events.rejection_reason"""

min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]

def _populate_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""

# first we need to find an endpoint.
# we need to find the final row in the batch of batch_size, which means
# we need to skip over (batch_size-1) rows and get the next row.
txn.execute(
"""
SELECT stream_ordering FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 1 OFFSET ?
""",
(
min_stream_ordering_exclusive,
max_stream_ordering_inclusive,
batch_size - 1,
),
)

endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]

where_clause = "stream_ordering > ?"
args = [min_stream_ordering_exclusive]
if endpoint:
where_clause += " AND stream_ordering <= ?"
args.append(endpoint)

# now do the updates.
txn.execute(
f"""
UPDATE events
SET state_key = (SELECT state_key FROM state_events se WHERE se.event_id = events.event_id),
rejection_reason = (SELECT reason FROM rejections rej WHERE rej.event_id = events.event_id)
WHERE ({where_clause})
""",
args,
)

logger.info(
"populated new `events` columns up to %s/%i: updated %i rows",
endpoint,
max_stream_ordering_inclusive,
txn.rowcount,
)

if endpoint is None:
# we're done
return True

progress["min_stream_ordering_exclusive"] = endpoint
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
progress,
)
return False

done = await self.db_pool.runInteraction(
desc="events_populate_state_key_rejections", func=_populate_txn
)

if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
)

return batch_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2022 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from synapse.storage.types import Cursor


def run_create(cur: Cursor, database_engine, *args, **kwargs):
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""

# we know that any new events will have the columns populated (and that has been
# the case since schema_version 68, so there is no chance of rolling back now).
#
# So, we only need to make sure that existing rows are updated. We read the
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()

if min_stream_ordering is None:
# no rows, nothing to do.
return

cur.execute(
"INSERT into background_updates (ordering, update_name, progress_json)"
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
(
json.dumps(
{
"min_stream_ordering_exclusive": min_stream_ordering - 1,
"max_stream_ordering_inclusive": max_stream_ordering,
}
),
),
)

0 comments on commit b116d3c

Please sign in to comment.