diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2681917d0b6a..22675942d57d 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -38,6 +38,7 @@ from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause +from synapse.storage.background_updates import ForeignKeyConstraint from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -140,6 +141,15 @@ def __init__( self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + if isinstance(self.database_engine, PostgresEngine): + self.db_pool.updates.register_background_validate_constraint_and_delete_rows( + update_name="event_forward_extremities_constraint_update", + table="event_forward_extremities", + constraint_name="event_forward_extremities_event_id", + constraint=ForeignKeyConstraint("events", [("event_id", "event_id")]), + unique_columns=("event_id", "room_id"), + ) + async def get_auth_chain( self, room_id: str, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e2e6eb479f61..f94ab719bfb2 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -415,12 +415,6 @@ def _persist_events_txn( backfilled=False, ) - self._update_forward_extremities_txn( - txn, - new_forward_extremities=new_forward_extremities, - max_stream_order=max_stream_order, - ) - # Ensure that we don't have the same event twice. events_and_contexts = self._filter_events_and_contexts_for_duplicates( events_and_contexts @@ -439,6 +433,12 @@ def _persist_events_txn( self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._update_forward_extremities_txn( + txn, + new_forward_extremities=new_forward_extremities, + max_stream_order=max_stream_order, + ) + self._persist_transaction_ids_txn(txn, events_and_contexts) # Insert into event_to_state_groups. diff --git a/synapse/storage/schema/main/delta/78/03event_extremities_constraints.py b/synapse/storage/schema/main/delta/78/03event_extremities_constraints.py new file mode 100644 index 000000000000..c7a2f430aa3e --- /dev/null +++ b/synapse/storage/schema/main/delta/78/03event_extremities_constraints.py @@ -0,0 +1,57 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This migration adds foreign key constraint to `event_forward_extremities` table. +""" +from synapse.storage.background_updates import ( + ForeignKeyConstraint, + run_validate_constraint_and_delete_rows_schema_delta, +) +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine + +FORWARD_EXTREMITIES_TABLE_SCHEMA = """ + CREATE TABLE event_forward_extremities2( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + UNIQUE (event_id, room_id), + CONSTRAINT event_forward_extremities_event_id FOREIGN KEY (event_id) REFERENCES events (event_id) + ); +""" + +FORWARD_EXTREMITIES_INDICES_SCHEMA = """ + CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); + CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); +""" + + +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: + run_validate_constraint_and_delete_rows_schema_delta( + cur, + ordering=7715, + update_name="event_forward_extremities_constraint_update", + table="event_forward_extremities", + constraint_name="event_forward_extremities_event_id", + constraint=ForeignKeyConstraint("events", [("event_id", "event_id")]), + sqlite_table_name="event_forward_extremities2", + sqlite_table_schema=FORWARD_EXTREMITIES_TABLE_SCHEMA, + sqlite_post_schema=FORWARD_EXTREMITIES_INDICES_SCHEMA, + ) + + # We can't add a similar constraint to `event_backward_extremities` as the + # events in there don't exist in the `events` table and `event_edges` + # doesn't have a unique constraint on `prev_event_id` (so we can't make a + # foreign key point to it). diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 4b8d8328d742..04daa3651556 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -101,6 +101,30 @@ def test_get_rooms_with_many_extremities(self) -> None: def insert_event(txn: Cursor, i: int, room_id: str) -> None: event_id = "$event_%i:local" % i + + # We need to insert into events table to get around the foreign key constraint. + self.store.db_pool.simple_insert_txn( + txn, + table="events", + values={ + "instance_name": "master", + "stream_ordering": self.store._stream_id_gen.get_next_txn(txn), + "topological_ordering": 1, + "depth": 1, + "event_id": event_id, + "room_id": room_id, + "type": EventTypes.Message, + "processed": True, + "outlier": False, + "origin_server_ts": 0, + "received_ts": 0, + "sender": "@user:local", + "contains_url": False, + "state_key": None, + "rejection_reason": None, + }, + ) + txn.execute( ( "INSERT INTO event_forward_extremities (room_id, event_id) " @@ -114,10 +138,14 @@ def insert_event(txn: Cursor, i: int, room_id: str) -> None: self.store.db_pool.runInteraction("insert", insert_event, i, room1) ) self.get_success( - self.store.db_pool.runInteraction("insert", insert_event, i, room2) + self.store.db_pool.runInteraction( + "insert", insert_event, i + 100, room2 + ) ) self.get_success( - self.store.db_pool.runInteraction("insert", insert_event, i, room3) + self.store.db_pool.runInteraction( + "insert", insert_event, i + 200, room3 + ) ) # Test simple case