Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Delete current state when server leaves a room #6792

Merged
merged 6 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6792.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete current state from the database when server leaves a room.
183 changes: 111 additions & 72 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event_dict
Expand Down Expand Up @@ -468,84 +469,93 @@ def _update_current_state_txn(
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert

# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
#
# The stream_id for the update is chosen to be the minimum of the stream_ids
# for the batch of the events that we are persisting; that means we do not
# end up in a situation where workers see events before the
# current_state_delta updates.
#
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
if delta_state.no_longer_in_room:
# Server is no longer in the room so we delete the room from
# current_state_events, being careful we've already updated the
# rooms.room_version column (which gets populated in a
# background task).
self._upsert_room_version_txn(txn, room_id)

# Before deleting we populate the current_state_delta_stream
# so that async background tasks get told what happened.
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, room_id, type, state_key, null, event_id
FROM current_state_events
WHERE room_id = ?
"""
txn.execute(sql, (stream_id, room_id))

self.db.simple_delete_txn(
txn, table="current_state_events", keyvalues={"room_id": room_id},
)
"""
txn.executemany(
sql,
(
(
stream_id,
room_id,
etype,
state_key,
None,
room_id,
etype,
state_key,
else:
# We're still in the room, so we update the current state as normal.

# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
#
# The stream_id for the update is chosen to be the minimum of the stream_ids
# for the batch of the events that we are persisting; that means we do not
# end up in a situation where workers see events before the
# current_state_delta updates.
#
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
for etype, state_key in to_delete
# We sanity check that we're deleting rather than updating
if (etype, state_key) not in to_insert
),
)
txn.executemany(
sql,
(
"""
txn.executemany(
sql,
(
stream_id,
room_id,
etype,
state_key,
ev_id,
room_id,
etype,
state_key,
)
for (etype, state_key), ev_id in iteritems(to_insert)
),
)
(
stream_id,
room_id,
etype,
state_key,
to_insert.get((etype, state_key)),
room_id,
etype,
state_key,
)
for etype, state_key in itertools.chain(to_delete, to_insert)
),
)
# Now we actually update the current_state_events table

# Now we actually update the current_state_events table
txn.executemany(
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
(
(room_id, etype, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
),
)

txn.executemany(
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
(
(room_id, etype, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
),
)
# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
txn.executemany(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
for key, ev_id in iteritems(to_insert)
],
)

# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
txn.executemany(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
for key, ev_id in iteritems(to_insert)
],
)
# We now update `local_current_membership`. We do this regardless
# of if we're still in the room or not to handle the case where
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# e.g. we just got banned (where we need to record that fact here).

# Note: Do we really want to delete rows here (that we do not
# subsequently reinsert below)? While technically correct it means
Expand Down Expand Up @@ -601,6 +611,35 @@ def _update_current_state_txn(

self._invalidate_state_caches_and_stream(txn, room_id, members_changed)

def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
"""Update the room version in the database based off current state
events.

This is used when we're about to delete current state and we want to
ensure that the `rooms.room_version` column is up to date.
"""

sql = """
SELECT json FROM event_json
INNER JOIN current_state_events USING (room_id, event_id)
WHERE room_id = ? AND type = ? AND state_key = ?
"""
txn.execute(sql, (room_id, EventTypes.Create, ""))
row = txn.fetchone()
if row:
event_json = json.loads(row[0])
content = event_json.get("content", {})
creator = content.get("creator")
room_version_id = content.get("room_version", RoomVersions.V1.identifier)

self.db.simple_upsert_txn(
txn,
table="rooms",
keyvalues={"room_id": room_id},
values={"room_version": room_version_id},
insertion_values={"is_public": False, "creator": creator},
)

def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
Expand Down
79 changes: 76 additions & 3 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import logging
from collections import deque, namedtuple
from typing import Iterable, List, Optional, Tuple
Expand All @@ -27,7 +28,7 @@

from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
Expand Down Expand Up @@ -72,17 +73,20 @@
)


@attr.s(slots=True, frozen=True)
@attr.s(slots=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.

Attributes:
to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state
no_longer_in_room: The server is not longer in the room, so the room
should e.g. be removed from `current_state_events` table.
"""

to_delete = attr.ib(type=List[Tuple[str, str]])
to_insert = attr.ib(type=StateMap[str])
no_longer_in_room = attr.ib(type=bool, default=False)


class _EventPeristenceQueue(object):
Expand Down Expand Up @@ -396,18 +400,35 @@ async def _persist_events(
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
delta = None
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = DeltaState([], delta_ids)
delta = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
):
delta = await self._calculate_state_delta(
room_id, current_state
)

if delta:
# If we have a change of state then lets check
# whether we're actually still a member of the room,
# or if our last user left. If we're no longer in
# the room then we delete the current state and
# extremities.
is_still_joined = await self._is_server_still_joined(
room_id, ev_ctx_rm, delta, current_state
)
if not is_still_joined:
logger.info("Server no longer in room %s", room_id)
latest_event_ids = []
current_state = {}
delta.no_longer_in_room = True

state_delta_for_room[room_id] = delta

# If we have the current_state then lets prefill
Expand Down Expand Up @@ -660,3 +681,55 @@ async def _calculate_state_delta(
}

return DeltaState(to_delete=to_delete, to_insert=to_insert)

async def _is_server_still_joined(
self,
room_id: str,
ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
delta: DeltaState,
current_state: Optional[StateMap[str]],
) -> bool:
# Now we check if we're actually still in the room,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# or if all our users have left.
if not any(
self.is_mine_id(state_key)
for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
if typ == EventTypes.Member
):
# There have been no changes to membership of our users, so nothing
# has changed and we assume we're still in the room.
return True

# Check if any of the given events are a local join that appear in the
# current state
for (typ, state_key), event_id in delta.to_insert.items():
if typ != EventTypes.Member or not self.is_mine_id(state_key):
continue

for event, _ in ev_ctx_rm:
if event_id == event.event_id:
if event.membership == Membership.JOIN:
return True

# There's been a change of membership but we don't have a local join
# event in the new events, so we need to check the full state.
if current_state is None:
current_state = await self.main_store.get_current_state_ids(room_id)
current_state = dict(current_state)
for key in delta.to_delete:
current_state.pop(key, None)

current_state.update(delta.to_insert)

event_ids = [
event_id
for (typ, state_key,), event_id in current_state.items()
if typ == EventTypes.Member and self.is_mine_id(state_key)
]

rows = await self.main_store.get_membership_from_event_ids(event_ids)
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
if is_still_joined:
return True
else:
return False