Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Purge chain cover tables when purging events. #9498

Merged
merged 5 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9498.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly purge the event chain cover index when purging history.
42 changes: 37 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
"""Deletes room history before a certain point
"""Deletes room history before a certain point.

Note that only a single purge can occur at once, this is guaranteed via
a higher level (in the PaginationHandler).

Args:
room_id:
Expand All @@ -52,7 +55,9 @@ async def purge_history(
delete_local_events,
)

def _purge_history_txn(self, txn, room_id, token, delete_local_events):
def _purge_history_txn(
self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
) -> Set[int]:
# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down Expand Up @@ -103,7 +108,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
# having any backwards extremities)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)
Expand Down Expand Up @@ -154,7 +159,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):

logger.info("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# We calculate the new entries for the backward extremities by finding
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
Expand Down Expand Up @@ -296,7 +301,7 @@ async def purge_room(self, room_id: str) -> List[int]:
"purge_room", self._purge_room_txn, room_id
)

def _purge_room_txn(self, txn, room_id):
def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
Expand All @@ -310,6 +315,31 @@ def _purge_room_txn(self, txn, room_id):

state_groups = [row[0] for row in txn]

# Get all the auth chains that are referenced by events that are to be
# deleted.
txn.execute(
"""
SELECT chain_id, sequence_number FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE room_id = ?
""",
(room_id,),
)
referenced_chain_id_tuples = list(txn)

logger.info("[purge] removing events from event_auth_chain_links")
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
(origin_chain_id = ? AND origin_sequence_number = ?) OR
(target_chain_id = ? AND target_sequence_number = ?)
""",
(
(chain_id, seq_num, chain_id, seq_num)
for (chain_id, seq_num) in referenced_chain_id_tuples
),
clokep marked this conversation as resolved.
Show resolved Hide resolved
)

# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
Expand All @@ -319,6 +349,8 @@ def _purge_room_txn(self, txn, room_id):
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"event_auth_chains",
"event_auth_chain_to_calculate",
"redactions",
"rejections",
"state_events",
Expand Down
5 changes: 0 additions & 5 deletions synapse/storage/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
Returns:
The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

Expand Down Expand Up @@ -111,8 +108,6 @@ async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
next_to_search |= prevs
state_groups_seen |= prevs

graph.update(edges)

to_delete = state_groups_seen - referenced_groups

return to_delete