Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor and convert Linearizer to async (#12357)
Browse files Browse the repository at this point in the history
Refactor and convert `Linearizer` to async. This makes a `Linearizer`
cancellation bug easier to fix.

Also refactor to use an async context manager, which eliminates an
unlikely footgun where code that doesn't immediately use the context
manager could forget to release the lock.

Signed-off-by: Sean Quah <seanq@element.io>
  • Loading branch information
squahtx authored Apr 5, 2022
1 parent ab3fdcf commit 800ba87
Show file tree
Hide file tree
Showing 21 changed files with 104 additions and 115 deletions.
1 change: 1 addition & 0 deletions changelog.d/12357.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `Linearizer`, convert methods to async and use an async context manager.
10 changes: 5 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def _handle_old_staged_events(self) -> None:
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
with (await self._server_linearizer.queue((origin, room_id))):
async with self._server_linearizer.queue((origin, room_id)):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

Expand Down Expand Up @@ -218,7 +218,7 @@ async def on_timestamp_to_event_request(
Tuple indicating the response status code and dictionary response
body including `event_id`.
"""
with (await self._server_linearizer.queue((origin, room_id))):
async with self._server_linearizer.queue((origin, room_id)):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

Expand Down Expand Up @@ -529,7 +529,7 @@ async def on_room_state_request(
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))):
async with self._server_linearizer.queue((origin, room_id)):
resp: JsonDict = dict(
await self._state_resp_cache.wrap(
(room_id, event_id),
Expand Down Expand Up @@ -883,7 +883,7 @@ async def _on_send_membership_event(
async def on_event_auth(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:
with (await self._server_linearizer.queue((origin, room_id))):
async with self._server_linearizer.queue((origin, room_id)):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

Expand Down Expand Up @@ -945,7 +945,7 @@ async def on_get_missing_events(
latest_events: List[str],
limit: int,
) -> Dict[str, list]:
with (await self._server_linearizer.queue((origin, room_id))):
async with self._server_linearizer.queue((origin, room_id)):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

Expand Down
6 changes: 2 additions & 4 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,8 @@ async def _notify_interested_services_ephemeral(
continue

# Since we read/update the stream position for this AS/stream
with (
await self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
)
async with self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ async def incoming_device_list_update(
async def _handle_device_updates(self, user_id: str) -> None:
"Actually handle pending updates."

with (await self._remote_edu_linearizer.queue(user_id)):
async with self._remote_edu_linearizer.queue(user_id):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def query_devices(
from_device_id: the device making the query. This is used to limit
the number of in-flight queries at a time.
"""
with await self._query_devices_linearizer.queue((from_user_id, from_device_id)):
async with self._query_devices_linearizer.queue((from_user_id, from_device_id)):
device_keys_query: Dict[str, Iterable[str]] = query_body.get(
"device_keys", {}
)
Expand Down Expand Up @@ -1386,7 +1386,7 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
device_handler = self.e2e_keys_handler.device_handler
device_list_updater = device_handler.device_list_updater

with (await self._remote_edu_linearizer.queue(user_id)):
async with self._remote_edu_linearizer.queue(user_id):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def get_room_keys(

# we deliberately take the lock to get keys so that changing the version
# works atomically
with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
# make sure the backup version exists
try:
await self.store.get_e2e_room_keys_version_info(user_id, version)
Expand Down Expand Up @@ -126,7 +126,7 @@ async def delete_room_keys(
"""

# lock for consistency with uploading
with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
# make sure the backup version exists
try:
version_info = await self.store.get_e2e_room_keys_version_info(
Expand Down Expand Up @@ -187,7 +187,7 @@ async def upload_room_keys(
# TODO: Validate the JSON to make sure it has the right keys.

# XXX: perhaps we should use a finer grained lock here?
with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):

# Check that the version we're trying to upload is the current version
try:
Expand Down Expand Up @@ -332,7 +332,7 @@ async def create_version(self, user_id: str, version_info: JsonDict) -> str:
# TODO: Validate the JSON to make sure it has the right keys.

# lock everyone out until we've switched version
with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
new_version = await self.store.create_e2e_room_keys_version(
user_id, version_info
)
Expand All @@ -359,7 +359,7 @@ async def get_version_info(
}
"""

with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
try:
res = await self.store.get_e2e_room_keys_version_info(user_id, version)
except StoreError as e:
Expand All @@ -383,7 +383,7 @@ async def delete_version(self, user_id: str, version: Optional[str] = None) -> N
NotFoundError: if this backup version doesn't exist
"""

with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
try:
await self.store.delete_e2e_room_keys_version(user_id, version)
except StoreError as e:
Expand Down Expand Up @@ -413,7 +413,7 @@ async def update_version(
raise SynapseError(
400, "Version in body does not match", Codes.INVALID_PARAM
)
with (await self._upload_linearizer.queue(user_id)):
async with self._upload_linearizer.queue(user_id):
try:
old_info = await self.store.get_e2e_room_keys_version_info(
user_id, version
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def maybe_backfill(
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
with (await self._room_backfill.queue(room_id)):
async with self._room_backfill.queue(room_id):
return await self._maybe_backfill_inner(room_id, current_depth, limit)

async def _maybe_backfill_inner(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
len(missing_prevs),
shortstr(missing_prevs),
)
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
async with self._room_pdu_linearizer.queue(pdu.room_id):
logger.info(
"Acquired room lock to fetch %d missing prev_events",
len(missing_prevs),
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ async def create_and_send_nonmember_event(
# a situation where event persistence can't keep up, causing
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (await self.limiter.queue(event_dict["room_id"])):
async with self.limiter.queue(event_dict["room_id"]):
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
event_dict["room_id"],
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ async def update_external_syncs_row(
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
with (await self.external_sync_linearizer.queue(process_id)):
async with self.external_sync_linearizer.queue(process_id):
prev_state = await self.current_state_for_user(user_id)

process_presence = self.external_process_to_current_syncs.setdefault(
Expand Down Expand Up @@ -1071,7 +1071,7 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
Used when the process has stopped/disappeared.
"""
with (await self.external_sync_linearizer.queue(process_id)):
async with self.external_sync_linearizer.queue(process_id):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def received_client_read_marker(
the read marker has changed.
"""

with await self.read_marker_linearizer.queue((room_id, user_id)):
async with self.read_marker_linearizer.queue((room_id, user_id)):
existing_read_marker = await self.store.get_account_data_for_room_and_type(
user_id, room_id, "m.fully_read"
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ async def create_room(
#
# we also don't need to check the requester's shadow-ban here, as we
# have already done so above (and potentially emptied invite_list).
with (await self.room_member_handler.member_linearizer.queue((room_id,))):
async with self.room_member_handler.member_linearizer.queue((room_id,)):
content = {}
is_direct = config.get("is_direct", None)
if is_direct:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ async def update_membership(

# We first linearise by the application service (to try to limit concurrent joins
# by application services), and then by room ID.
with (await self.member_as_limiter.queue(as_id)):
with (await self.member_linearizer.queue(key)):
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
result = await self.update_membership_locked(
requester,
target,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sso.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def complete_sso_login_request(
# grab a lock while we try to find a mapping for this user. This seems...
# optimistic, especially for implementations that end up redirecting to
# interstitial pages.
with await self._mapping_lock.queue(auth_provider_id):
async with self._mapping_lock.queue(auth_provider_id):
# first of all, check if we already have a mapping for this user
user_id = await self.get_sso_user_by_remote_user_id(
auth_provider_id,
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ async def get_rules(
self.room_push_rule_cache_metrics.inc_hits()
return self.data.rules_by_user

with (await self.linearizer.queue(self.room_id)):
async with self.linearizer.queue(self.room_id):
if state_group and self.data.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
self.room_push_rule_cache_metrics.inc_hits()
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ async def _save_and_send_ack(self) -> None:
# service for robustness? Or could we replace it with an assertion that
# we're not being re-entered?

with (await self._fed_position_linearizer.queue(None)):
async with self._fed_position_linearizer.queue(None):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position
Expand Down
6 changes: 3 additions & 3 deletions synapse/rest/media/v1/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async def get_remote_media(
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
key = (server_name, media_id)
with (await self.remote_media_linearizer.queue(key)):
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id
)
Expand Down Expand Up @@ -294,7 +294,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
key = (server_name, media_id)
with (await self.remote_media_linearizer.queue(key)):
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id
)
Expand Down Expand Up @@ -850,7 +850,7 @@ async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:

# TODO: Should we delete from the backup store

with (await self.remote_media_linearizer.queue(key)):
async with self.remote_media_linearizer.queue(key):
full_path = self.filepaths.remote_media_filepath(origin, file_id)
try:
os.remove(full_path)
Expand Down
2 changes: 1 addition & 1 deletion synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ async def resolve_state_groups(
"""
group_names = frozenset(state_groups_ids.keys())

with (await self.resolve_linearizer.queue(group_names)):
async with self.resolve_linearizer.queue(group_names):
cache = self._state_cache.get(group_names, None)
if cache:
return cache
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ async def _get_joined_hosts(
return frozenset(cache.hosts_to_joined_users)

# Since we'll mutate the cache we need to lock.
with (await self._joined_host_linearizer.queue(room_id)):
async with self._joined_host_linearizer.queue(room_id):
if state_entry.state_group == cache.state_group:
# Same state group, so nothing to do. We've already checked for
# this above, but the cache may have changed while waiting on
Expand Down
Loading

0 comments on commit 800ba87

Please sign in to comment.