Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Take out lock while persisting events / deleting room
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jul 19, 2023
1 parent b965651 commit 8de5e05
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 39 deletions.
17 changes: 14 additions & 3 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
Expand Down Expand Up @@ -137,6 +138,7 @@ def __init__(self, hs: "HomeServer"):
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()
self._e2e_keys_handler = hs.get_e2e_keys_handler()
self._worker_lock_handler = hs.get_worker_locks_handler()

self._state_storage_controller = hs.get_storage_controllers().state

Expand Down Expand Up @@ -1236,9 +1238,18 @@ async def _process_incoming_pdus_in_room_inner(
logger.info("handling received PDU in room %s: %s", room_id, event)
try:
with nested_logging_context(event.event_id):
await self._federation_event_handler.on_receive_pdu(
origin, event
)
# We're taking out a lock within a lock, which could
# lead to deadlocks if we're not careful. However, it is
# safe on this occasion as we only ever take a write
# lock when deleting a room, which we would never do
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
# lock.
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
await self._federation_event_handler.on_receive_pdu(
origin, event
)
except FederationError as e:
# XXX: Ideally we'd inform the remote we failed to process
# the event, but we can't return an error in the transaction
Expand Down
38 changes: 37 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -485,6 +486,7 @@ def __init__(self, hs: "HomeServer"):
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
self._worker_lock_handler = hs.get_worker_locks_handler()

self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state

Expand Down Expand Up @@ -1010,6 +1012,37 @@ async def create_and_send_nonmember_event(
event.internal_metadata.stream_ordering,
)

async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
return await self._create_and_send_nonmember_event(
requester=requester,
event_dict=event_dict,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
ratelimit=ratelimit,
txn_id=txn_id,
ignore_shadow_ban=ignore_shadow_ban,
outlier=outlier,
depth=depth,
)

async def _create_and_send_nonmember_event(
self,
requester: Requester,
event_dict: dict,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
room_id = event_dict["room_id"]

# If we don't have any prev event IDs specified then we need to
# check that the host is in the room (as otherwise populating the
# prev events will fail), at which point we may as well check the
Expand Down Expand Up @@ -1924,7 +1957,10 @@ async def _send_dummy_events_to_fill_extremities(self) -> None:
)

for room_id in room_ids:
dummy_event_sent = await self._send_dummy_event_for_room(room_id)
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
dummy_event_sent = await self._send_dummy_event_for_room(room_id)

if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

PURGE_HISTORY_LOCK_NAME = "purge_history_lock"

DELETE_ROOM_LOCK_NAME = "delete_room_lock"


@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
Expand Down Expand Up @@ -418,8 +420,9 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
async with self._worker_locks.acquire_multi_read_write_lock(
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
write=True,
):
# first check that we have no users in this room
if not force:
Expand Down
45 changes: 25 additions & 20 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(self, hs: "HomeServer"):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self._worker_lock_handler = hs.get_worker_locks_handler()

self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
Expand Down Expand Up @@ -638,26 +640,29 @@ async def update_membership(
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
origin_server_ts=origin_server_ts,
)
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
origin_server_ts=origin_server_ts,
)

return result

Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from synapse.server import HomeServer


DELETE_ROOM_LOCK_NAME = "delete_room_lock"


class WorkerLocksHandler:
"""A class for waiting on taking out locks, rather than using the storage
functions directly (which don't support awaiting).
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ def __init__(self, hs: "HomeServer"):
if self._is_master or self._should_insert_client_ips:
self.subscribe_to_channel("USER_IP")

self._notifier.add_lock_released_callback(self.on_lock_released)
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)

def subscribe_to_channel(self, channel_name: str) -> None:
"""
Expand Down
11 changes: 8 additions & 3 deletions synapse/rest/client/room_upgrade_rest_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.api.errors import Codes, ShadowBanError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(self, hs: "HomeServer"):
self._hs = hs
self._room_creation_handler = hs.get_room_creation_handler()
self._auth = hs.get_auth()
self._worker_lock_handler = hs.get_worker_locks_handler()

async def on_POST(
self, request: SynapseRequest, room_id: str
Expand All @@ -78,9 +80,12 @@ async def on_POST(
)

try:
new_room_id = await self._room_creation_handler.upgrade_room(
requester, room_id, new_version
)
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
new_room_id = await self._room_creation_handler.upgrade_room(
requester, room_id, new_version
)
except ShadowBanError:
# Generate a random room ID.
new_room_id = stringutils.random_string(18)
Expand Down
27 changes: 18 additions & 9 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
SynapseTags,
Expand Down Expand Up @@ -338,6 +339,7 @@ def __init__(
)
self._state_resolution_handler = hs.get_state_resolution_handler()
self._state_controller = state_controller
self.hs = hs

async def _process_event_persist_queue_task(
self,
Expand All @@ -350,15 +352,22 @@ async def _process_event_persist_queue_task(
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
"""
if isinstance(task, _PersistEventsTask):
return await self._persist_event_batch(room_id, task)
elif isinstance(task, _UpdateCurrentStateTask):
await self._update_current_state(room_id, task)
return {}
else:
raise AssertionError(
f"Found an unexpected task type in event persistence queue: {task}"
)

# Ensure that the room can't be deleted while we're persisting events to
# it. We might already have taken out the lock, but since this is just a
# "read" lock its inherently reentrant.
async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
if isinstance(task, _PersistEventsTask):
return await self._persist_event_batch(room_id, task)
elif isinstance(task, _UpdateCurrentStateTask):
await self._update_current_state(room_id, task)
return {}
else:
raise AssertionError(
f"Found an unexpected task type in event persistence queue: {task}"
)

@trace
async def persist_events(
Expand Down

0 comments on commit 8de5e05

Please sign in to comment.