Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster room joins: Resume state re-syncing after a Synapse restart #12813

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12812.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Try other homeservers when re-syncing state for rooms with partial state.
1 change: 1 addition & 0 deletions changelog.d/12813.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Resume state re-syncing for rooms with partial state after a Synapse restart.
2 changes: 1 addition & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async def get_room_state_ids(
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise Exception("invalid response from /state_ids")
raise InvalidResponseError("invalid response from /state_ids")

return state_event_ids, auth_event_ids

Expand Down
94 changes: 84 additions & 10 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import logging
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)

import attr
from signedjson.key import decode_verify_key_bytes
Expand All @@ -34,6 +43,7 @@
CodeMessageException,
Codes,
FederationDeniedError,
FederationError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
Expand Down Expand Up @@ -158,6 +168,11 @@ def __init__(self, hs: "HomeServer"):

self.third_party_event_rules = hs.get_third_party_event_rules()

if not hs.config.worker.worker_app:
run_as_background_process(
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
squahtx marked this conversation as resolved.
Show resolved Hide resolved

async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
Expand Down Expand Up @@ -459,6 +474,8 @@ async def do_invite_join(
"""
# TODO: We should be able to call this on workers, but the upgrading of
# room stuff after join currently doesn't work on workers.
# TODO: Before we relax this condition, we need to allow re-syncing of
# partial room state to happen on workers.
assert self.config.worker.worker_app is None

logger.debug("Joining %s to %s", joinee, room_id)
Expand Down Expand Up @@ -539,12 +556,11 @@ async def do_invite_join(
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
#
# TODO(faster_joins): pick this up again on restart
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
destination=origin,
destinations=ret.servers_in_room,
room_id=room_id,
)

Expand Down Expand Up @@ -1441,17 +1457,35 @@ async def get_room_complexity(
# well.
return None

async def _resume_sync_partial_state_room(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app

partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
for room_id, servers_in_room in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
destination=None,
destinations=servers_in_room,
room_id=room_id,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not see a simple way to determine the destination the original sync was using after a restart, so I took the entire list of servers in the room to try.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think this is ok for now, but it definitely leads to less than satisfactory results while it works its way through the entire server list of Matrix HQ trying to find a server which will respond.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #12999 to track this


async def _sync_partial_state_room(
self,
destination: str,
destination: Optional[str],
destinations: Collection[str],
room_id: str,
) -> None:
"""Background process to resync the state of a partial-state room

Args:
destination: homeserver to pull the state from
destination: the initial homeserver to pull the state from
destinations: other homeservers to try to pull the state from, if
`destination` is unavailable
room_id: room to be resynced
"""
assert not self.config.worker.worker_app
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is implied by the identical assert in do_invite_join, which starts the background task.


# TODO(faster_joins): do we need to lock to avoid races? What happens if other
# worker processes kick off a resync in parallel? Perhaps we should just elect
Expand All @@ -1460,9 +1494,19 @@ async def _sync_partial_state_room(
# TODO(faster_joins): what happens if we leave the room during a resync? if we
# really leave, that might mean we have difficulty getting the room state over
# federation.
#
# TODO(faster_joins): try other destinations if the one we have fails

# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
if destination is not None:
# Move `destination` to the front of the list.
destinations = list(destinations)
if destination in destinations:
destinations.remove(destination)
destinations = [destination] + destinations
destination_iter = itertools.cycle(destinations)

# `destination` is now the current remote homeserver we're pulling from.
destination = next(destination_iter)
logger.info("Syncing state for room %s via %s", room_id, destination)

# we work through the queue in order of increasing stream ordering.
Expand Down Expand Up @@ -1498,6 +1542,36 @@ async def _sync_partial_state_room(
allow_rejected=True,
)
for event in events:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
for attempt in range(len(destinations)):
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationError as e:
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
logger.error(
"Failed to get state for %s at %s from %s because %s, "
"giving up!",
room_id,
event,
destination,
e,
)
raise

# Try the next remote server.
logger.info(
"Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
destination = next(destination_iter)
logger.info(
"Syncing state for room %s via %s instead",
room_id,
destination,
)
7 changes: 7 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ async def update_state_for_partial_state_event(
Args:
destination: server to request full state from
event: partial-state event to be de-partial-stated

Raises:
FederationError if we fail to request state from the remote server.
"""
logger.info("Updating state for %s", event.event_id)
with nested_logging_context(suffix=event.event_id):
Expand Down Expand Up @@ -792,6 +795,10 @@ async def _resolve_state_at_missing_prevs(
Returns:
if we already had all the prev events, `None`. Otherwise, returns a list of
the events in the state at `event`.

Raises:
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
room_id = event.room_id
event_id = event.event_id
Expand Down
29 changes: 29 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Collection,
Dict,
List,
Mapping,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -1077,6 +1078,34 @@ def get_rooms_for_retention_period_in_range_txn(
get_rooms_for_retention_period_in_range_txn,
)

async def get_partial_state_rooms_and_servers(
self,
) -> Mapping[str, Collection[str]]:
"""Get all rooms containing events with partial state, and the servers known
to be in the room.

Returns:
A dictionary of rooms with partial state, with room IDs as keys and
lists of servers in rooms as values.
"""
room_servers: Dict[str, List[str]] = {}

rows = await self.db_pool.simple_select_list(
"partial_state_rooms_servers",
keyvalues={},
squahtx marked this conversation as resolved.
Show resolved Hide resolved
retcols=("room_id", "server_name"),
desc="get_partial_state_rooms",
)

for row in rows:
room_id = row["room_id"]
server_name = row["server_name"]
if room_id not in room_servers:
room_servers[room_id] = []
room_servers[room_id].append(server_name)
squahtx marked this conversation as resolved.
Show resolved Hide resolved

return room_servers

async def clear_partial_state_room(self, room_id: str) -> bool:
# this can race with incoming events, so we watch out for FK errors.
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
Expand Down