Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Servlet to purge old rooms #5845

Merged
merged 7 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5845.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an admin API to purge old rooms from the database.
18 changes: 18 additions & 0 deletions docs/admin_api/purge_room.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Purge room API
==============

This API will remove all trace of a room from your database.

All local users must have left the room before it can be removed.

The API is:

```
POST /_synapse/admin/v1/purge_room

{
"room_id": "!room:id"
}
```

You must authenticate using the access token of an admin user.
17 changes: 17 additions & 0 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self._server_name = hs.hostname

self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
Expand Down Expand Up @@ -153,6 +154,22 @@ def get_purge_status(self, purge_id):
"""
return self._purges_by_id.get(purge_id)

async def purge_room(self, room_id):
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
# check we know about the room
await self.store.get_room_version(room_id)

# first check that we have no users in this room
joined = await defer.maybeDeferred(
self.store.is_host_joined, room_id, self._server_name
)

if joined:
raise SynapseError(400, "Users are still joined to this room")

await self.store.purge_room(room_id)

@defer.inlineCallbacks
def get_messages(
self,
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -738,6 +739,7 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)

Expand Down
57 changes: 57 additions & 0 deletions synapse/rest/admin/purge_room_servlet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re

from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.rest.admin import assert_requester_is_admin


class PurgeRoomServlet(RestServlet):
"""Servlet which will remove all trace of a room from the database

POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}

returns:

{}
"""

PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)

def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.auth = hs.get_auth()
self.pagination_handler = hs.get_pagination_handler()

async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)

body = parse_json_object_from_request(request)
assert_params_in_dict(body, ("room_id",))

await self.pagination_handler.purge_room(body["room_id"])

return (200, {})
137 changes: 137 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,143 @@ def _find_unreferenced_groups_during_purge(self, txn, state_groups):

return to_delete, to_dedelta

def purge_room(self, room_id):
"""Deletes all record of a room

Args:
room_id (str):
"""

return self.runInteraction("purge_room", self._purge_room_txn, room_id)

def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)

txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)

txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)

txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# and then tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
"event_push_actions_staging",
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"redactions",
"rejections",
"state_events",
):
logger.info("[purge] removing %s from %s", room_id, table)

txn.execute(
"""
DELETE FROM %s WHERE event_id IN (
SELECT event_id FROM events WHERE room_id=?
)
"""
% (table,),
(room_id,),
)

# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
"event_push_actions",
"event_search",
"events",
"group_rooms",
"public_room_list_stream",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
"topics",
"users_in_public_rooms",
"users_who_share_private_rooms",
# no useful index, but let's clear them anyway
"appservice_room_list",
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"local_invites",
"room_account_data",
"room_tags",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))

# Other tables we do NOT need to clear out:
#
# - blocked_rooms
# This is important, to make sure that we don't accidentally rejoin a blocked
# room after it was purged
#
# - user_directory
# This has a room_id column, but it is unused
#

# Other tables that we might want to consider clearing out include:
#
# - event_reports
# Given that these are intended for abuse management my initial
# inclination is to leave them in place.
#
# - current_state_delta_stream
# - ex_outlier_stream
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch of cache invalidation here

logger.info("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down