From bd2caa952981dd91ef6006314402734df405c043 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 15:21:54 +0000 Subject: [PATCH 01/11] Scaffolding for background process to refresh profiles --- synapse/handlers/user_directory.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0815be79fa54..093b07fe12db 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -75,6 +75,9 @@ def __init__(self, hs: "HomeServer"): # Guard to ensure we only process deltas one at a time self._is_processing = False + # Guard to ensure we only have one process for refreshing remote profiles + self._is_refreshing_remote_profiles = False + if self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) @@ -82,6 +85,9 @@ def __init__(self, hs: "HomeServer"): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) + # Kick off the profile refresh process on startup + self.clock.call_later(10, self.kick_off_remote_profile_refresh_process) + async def search_users( self, user_id: str, search_term: str, limit: int ) -> SearchResult: @@ -505,3 +511,23 @@ async def _handle_possible_remote_profile_change( # Only update if something has changed, or we didn't have a previous event # in the first place. await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + + def kick_off_remote_profile_refresh_process(self) -> None: + """Called when there may be remote users with stale profiles to be refreshed""" + if not self.update_user_directory: + return + + if self._is_refreshing_remote_profiles: + return + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles() + finally: + self._is_refreshing_remote_profiles = False + + self._is_refreshing_remote_profiles = True + run_as_background_process("user_directory.refresh_remote_profiles", process) + + async def _unsafe_refresh_remote_profiles(self) -> None: + pass From 36d4d0ac452829d2250da0d106b02925f377aa85 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 15:24:56 +0000 Subject: [PATCH 02/11] Add scaffolding for background process to refresh profiles for a given server --- synapse/handlers/user_directory.py | 34 ++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 093b07fe12db..9befca93b888 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -78,6 +78,11 @@ def __init__(self, hs: "HomeServer"): # Guard to ensure we only have one process for refreshing remote profiles self._is_refreshing_remote_profiles = False + # Guard to ensure we only have one process for refreshing remote profiles + # for the given servers. + # Set of server names. + self._is_refreshing_remote_profiles_for_servers: Set[str] = set() + if self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) @@ -531,3 +536,32 @@ async def process() -> None: async def _unsafe_refresh_remote_profiles(self) -> None: pass + + def kick_off_remote_profile_refresh_process_for_remote_server( + self, server_name: str + ) -> None: + """Called when there may be remote users with stale profiles to be refreshed + on the given server.""" + if not self.update_user_directory: + return + + if server_name in self._is_refreshing_remote_profiles_for_servers: + return + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles_for_remote_server( + server_name + ) + finally: + self._is_refreshing_remote_profiles_for_servers.remove(server_name) + + self._is_refreshing_remote_profiles_for_servers.add(server_name) + run_as_background_process( + "user_directory.refresh_remote_profiles_for_remote_server", process + ) + + async def _unsafe_refresh_remote_profiles_for_remote_server( + self, server_name: str + ) -> None: + pass From 4c9670254ddab2774186ebaf198aefd5b73389ae Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 15:38:21 +0000 Subject: [PATCH 03/11] Implement the code to select servers to refresh from --- synapse/handlers/user_directory.py | 42 ++++++++++++++++++- .../storage/databases/main/user_directory.py | 28 +++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 9befca93b888..6f5c70a9c620 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -33,6 +33,14 @@ # then be coalesced such that only one /profile request is made). USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000 +# Maximum number of remote servers that we will attempt to refresh profiles for +# in one go. +MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5 + +# As long as we have servers to refresh (without backoff), keep adding more +# every 15 seconds. +INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 + class UserDirectoryHandler(StateDeltasHandler): """Handles queries and updates for the user_directory. @@ -535,7 +543,39 @@ async def process() -> None: run_as_background_process("user_directory.refresh_remote_profiles", process) async def _unsafe_refresh_remote_profiles(self) -> None: - pass + limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len( + self._is_refreshing_remote_profiles_for_servers + ) + if limit <= 0: + # nothing to do: already refreshing the maximum number of servers + # at once. + # Come back later. + self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) + return + + servers_to_refresh = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=self.clock.time_msec(), limit=limit + ) + ) + + if not servers_to_refresh: + # TODO Do we have any backing-off servers that we should try again + # for eventually? + return + + for server_to_refresh in servers_to_refresh: + self.kick_off_remote_profile_refresh_process_for_remote_server( + server_to_refresh + ) + + self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) def kick_off_remote_profile_refresh_process_for_remote_server( self, server_name: str diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9cf01b7f36ba..f12e0ca21648 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -504,6 +504,34 @@ async def set_remote_user_profile_in_user_dir_stale( desc="set_remote_user_profile_in_user_dir_stale", ) + async def get_remote_servers_with_profiles_to_refresh( + self, now_ts: int, limit: int + ) -> List[str]: + """ + Get a list of up to `limit` server names which have users whose + locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + """ + + def _get_remote_servers_with_refreshable_profiles_txn( + txn: LoggingTransaction, + ) -> List[str]: + sql = """ + SELECT user_server_name + FROM user_directory_stale_remote_users + WHERE next_try_at_ts < ? + GROUP BY user_server_name + ORDER BY MIN(next_try_at_ts), user_server_name + LIMIT ? + """ + txn.execute(sql, (now_ts, limit)) + return [row[0] for row in txn] + + return await self.db_pool.runInteraction( + "get_remote_servers_with_profiles_to_refresh", + _get_remote_servers_with_refreshable_profiles_txn, + ) + async def update_profile_in_user_dir( self, user_id: str, display_name: Optional[str], avatar_url: Optional[str] ) -> None: From b45bf6d4d55e5405b31ef9c8eba008f62546c1f3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 15:51:27 +0000 Subject: [PATCH 04/11] Ensure we don't build up multiple looping calls --- synapse/handlers/user_directory.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 6f5c70a9c620..bd5737a91eeb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -15,6 +15,8 @@ import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from twisted.internet.interfaces import IDelayedCall + import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler @@ -85,6 +87,8 @@ def __init__(self, hs: "HomeServer"): # Guard to ensure we only have one process for refreshing remote profiles self._is_refreshing_remote_profiles = False + # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process` + self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None # Guard to ensure we only have one process for refreshing remote profiles # for the given servers. @@ -99,7 +103,9 @@ def __init__(self, hs: "HomeServer"): self.clock.call_later(0, self.notify_new_event) # Kick off the profile refresh process on startup - self.clock.call_later(10, self.kick_off_remote_profile_refresh_process) + self._refresh_remote_profiles_call_later = self.clock.call_later( + 10, self.kick_off_remote_profile_refresh_process + ) async def search_users( self, user_id: str, search_term: str, limit: int @@ -533,6 +539,11 @@ def kick_off_remote_profile_refresh_process(self) -> None: if self._is_refreshing_remote_profiles: return + if self._refresh_remote_profiles_call_later: + if self._refresh_remote_profiles_call_later.active(): + self._refresh_remote_profiles_call_later.cancel() + self._refresh_remote_profiles_call_later = None + async def process() -> None: try: await self._unsafe_refresh_remote_profiles() @@ -550,7 +561,7 @@ async def _unsafe_refresh_remote_profiles(self) -> None: # nothing to do: already refreshing the maximum number of servers # at once. # Come back later. - self.clock.call_later( + self._refresh_remote_profiles_call_later = self.clock.call_later( INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, self.kick_off_remote_profile_refresh_process, ) @@ -572,7 +583,7 @@ async def _unsafe_refresh_remote_profiles(self) -> None: server_to_refresh ) - self.clock.call_later( + self._refresh_remote_profiles_call_later = self.clock.call_later( INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, self.kick_off_remote_profile_refresh_process, ) @@ -604,4 +615,4 @@ async def process() -> None: async def _unsafe_refresh_remote_profiles_for_remote_server( self, server_name: str ) -> None: - pass + logger.info("Refreshing profiles in user directory for %s", server_name) From fddb6f5064d5e68ae244a1f502bc382970a4f596 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 16:31:10 +0000 Subject: [PATCH 05/11] Make `get_profile` able to respect backoffs --- synapse/handlers/profile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 4bf9a047a3bc..9a81a77cbd3b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -63,7 +63,7 @@ def __init__(self, hs: "HomeServer"): self._third_party_rules = hs.get_third_party_event_rules() - async def get_profile(self, user_id: str) -> JsonDict: + async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict: target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): @@ -81,7 +81,7 @@ async def get_profile(self, user_id: str) -> JsonDict: destination=target_user.domain, query_type="profile", args={"user_id": user_id}, - ignore_backoff=True, + ignore_backoff=ignore_backoff, ) return result except RequestSendFailed as e: From 36d19e372597084c5c12e591e1f59f3dcd1ec01d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 16:31:41 +0000 Subject: [PATCH 06/11] Add logic for refreshing users --- synapse/handlers/user_directory.py | 92 +++++++++++++++++++ .../storage/databases/main/user_directory.py | 45 +++++++++ 2 files changed, 137 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index bd5737a91eeb..cd0c2a384688 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -13,17 +13,21 @@ # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from twisted.internet.interfaces import IDelayedCall import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership +from synapse.api.errors import Codes, SynapseError from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer @@ -44,6 +48,17 @@ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 +def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int: + """ + Calculates the time of a next retry given `now_ts` in ms and the number + of failures encountered thus far. + + Currently the sequence goes: + 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week + """ + return now_ts + 60_000 * (5 ** min(retry_count, 7)) + + class UserDirectoryHandler(StateDeltasHandler): """Handles queries and updates for the user_directory. @@ -79,6 +94,8 @@ def __init__(self, hs: "HomeServer"): self.update_user_directory = hs.config.worker.should_update_user_directory self.search_all_users = hs.config.userdirectory.user_directory_search_all_users self.spam_checker = hs.get_spam_checker() + self._hs = hs + # The current position in the current_state_delta stream self.pos: Optional[int] = None @@ -616,3 +633,78 @@ async def _unsafe_refresh_remote_profiles_for_remote_server( self, server_name: str ) -> None: logger.info("Refreshing profiles in user directory for %s", server_name) + + while True: + # Get a handful of users to process. + next_batch = await self.store.get_remote_users_to_refresh_on_server( + server_name, now_ts=self.clock.time_msec(), limit=10 + ) + if not next_batch: + # Finished for now + return + + for user_id, retry_counter in next_batch: + # Request the profile of the user. + try: + profile = await self._hs.get_profile_handler().get_profile( + user_id, ignore_backoff=False + ) + except NotRetryingDestination as e: + logger.info( + "Failed to refresh profile for %r because the destination is undergoing backoff", + user_id, + ) + # As a special-case, we back off until the destination is no longer + # backed off from. + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + e.retry_last_ts + e.retry_interval, + retry_counter=retry_counter + 1, + ) + continue + except SynapseError as e: + if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND: + # The profile doesn't exist. + # TODO Does this mean we should clear it from our user + # directory? + await self.store.clear_remote_user_profile_in_user_dir_stale( + user_id + ) + logger.warning( + "Refresh of remote profile %r: not found (%r)", + user_id, + e.msg, + ) + continue + + logger.warning( + "Failed to refresh profile for %r because %r", user_id, e + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + except Exception: + logger.error( + "Failed to refresh profile for %r due to unhandled exception", + user_id, + exc_info=True, + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + + await self.store.update_profile_in_user_dir( + user_id, + display_name=non_null_str_or_none(profile.get("displayname")), + avatar_url=non_null_str_or_none(profile.get("avatar_url")), + ) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index f12e0ca21648..4dd7c083c84d 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -504,6 +504,19 @@ async def set_remote_user_profile_in_user_dir_stale( desc="set_remote_user_profile_in_user_dir_stale", ) + async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None: + """ + Marks a remote user as no longer having a possibly-stale user directory profile. + + Args: + user_id: the remote user who no longer has a stale profile on this server. + """ + await self.db_pool.simple_delete( + table="user_directory_stale_remote_users", + keyvalues={"user_id": user_id}, + desc="clear_remote_user_profile_in_user_dir_stale", + ) + async def get_remote_servers_with_profiles_to_refresh( self, now_ts: int, limit: int ) -> List[str]: @@ -532,6 +545,38 @@ def _get_remote_servers_with_refreshable_profiles_txn( _get_remote_servers_with_refreshable_profiles_txn, ) + async def get_remote_users_to_refresh_on_server( + self, server_name: str, now_ts: int, limit: int + ) -> List[Tuple[str, int]]: + """ + Get a list of up to `limit` user IDs from the server `server_name` + whose locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + + Returns: + tuple of: + - User ID + - Retry counter (number of failures so far) + """ + + def _get_remote_users_to_refresh_on_server_txn( + txn: LoggingTransaction, + ) -> List[Tuple[str, int]]: + sql = """ + SELECT user_id, retry_counter + FROM user_directory_stale_remote_users + WHERE user_server_name = ? AND next_try_at_ts < ? + ORDER BY next_try_at_ts + LIMIT ? + """ + txn.execute(sql, (server_name, now_ts, limit)) + return txn.fetchall() + + return await self.db_pool.runInteraction( + "get_remote_users_to_refresh_on_server", + _get_remote_users_to_refresh_on_server_txn, + ) + async def update_profile_in_user_dir( self, user_id: str, display_name: Optional[str], avatar_url: Optional[str] ) -> None: From 080698ffe0c06559f647aae9a2fc4059993d1ef3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 16:37:27 +0000 Subject: [PATCH 07/11] When backing off, schedule a refresh when the backoff is over --- synapse/handlers/user_directory.py | 27 ++++++++++++++++--- .../storage/databases/main/user_directory.py | 9 ++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index cd0c2a384688..34abd6556ced 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -591,8 +591,29 @@ async def _unsafe_refresh_remote_profiles(self) -> None: ) if not servers_to_refresh: - # TODO Do we have any backing-off servers that we should try again - # for eventually? + # Do we have any backing-off servers that we should try again + # for eventually? + end_of_time = 1 << 62 + backing_off_servers = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=end_of_time, limit=1 + ) + ) + if backing_off_servers: + # Find out when the next user is refreshable and schedule a + # refresh then. + backing_off_server_name = backing_off_servers[0] + users = await self.store.get_remote_users_to_refresh_on_server( + backing_off_server_name, now_ts=end_of_time, limit=1 + ) + if not users: + return + _, _, next_try_at_ts = users[0] + self._refresh_remote_profiles_call_later = self.clock.call_later( + ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2, + self.kick_off_remote_profile_refresh_process, + ) + return for server_to_refresh in servers_to_refresh: @@ -643,7 +664,7 @@ async def _unsafe_refresh_remote_profiles_for_remote_server( # Finished for now return - for user_id, retry_counter in next_batch: + for user_id, retry_counter, _ in next_batch: # Request the profile of the user. try: profile = await self._hs.get_profile_handler().get_profile( diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 4dd7c083c84d..97f09b73dde6 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -547,7 +547,7 @@ def _get_remote_servers_with_refreshable_profiles_txn( async def get_remote_users_to_refresh_on_server( self, server_name: str, now_ts: int, limit: int - ) -> List[Tuple[str, int]]: + ) -> List[Tuple[str, int, int]]: """ Get a list of up to `limit` user IDs from the server `server_name` whose locally-cached profiles we believe to be stale @@ -557,20 +557,21 @@ async def get_remote_users_to_refresh_on_server( tuple of: - User ID - Retry counter (number of failures so far) + - Time the retry is scheduled for, in milliseconds """ def _get_remote_users_to_refresh_on_server_txn( txn: LoggingTransaction, - ) -> List[Tuple[str, int]]: + ) -> List[Tuple[str, int, int]]: sql = """ - SELECT user_id, retry_counter + SELECT user_id, retry_counter, next_try_at_ts FROM user_directory_stale_remote_users WHERE user_server_name = ? AND next_try_at_ts < ? ORDER BY next_try_at_ts LIMIT ? """ txn.execute(sql, (server_name, now_ts, limit)) - return txn.fetchall() + return cast(List[Tuple[str, int, int]], txn.fetchall()) return await self.db_pool.runInteraction( "get_remote_users_to_refresh_on_server", From 0e1866f6bb2eed764867c5a16ef550c0bb8be887 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 16:45:03 +0000 Subject: [PATCH 08/11] Wake up the background processes when we receive an interesting state event --- synapse/handlers/user_directory.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 34abd6556ced..e16e57748ac6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo +from synapse.types import UserID from synapse.util.metrics import Measure from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import non_null_str_or_none @@ -525,6 +526,20 @@ async def _handle_possible_remote_profile_change( next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS, retry_counter=0, ) + # Schedule a wake-up to refresh the user directory for this server. + # We intentionally wake up this server directly because we don't want + # other servers ahead of it in the queue to get in the way of updating + # the profile if the server only just sent us an event. + self.clock.call_later( + USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process_for_remote_server, + UserID.from_string(user_id).domain, + ) + # Schedule a wake-up to handle any backoffs that may occur in the future. + self.clock.call_later( + 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process, + ) return prev_name = prev_event.content.get("displayname") From 0433e88f361a562a685eb0f5927f83a5006444d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 18:14:44 +0000 Subject: [PATCH 09/11] Add tests --- tests/handlers/test_user_directory.py | 187 +++++++++++++++++++++++++- 1 file changed, 185 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index a02c1c62279a..da4d24082648 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -19,17 +19,18 @@ import synapse.rest.admin from synapse.api.constants import UserTypes +from synapse.api.errors import SynapseError from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.appservice import ApplicationService from synapse.rest.client import login, register, room, user_directory from synapse.server import HomeServer from synapse.storage.roommember import ProfileInfo -from synapse.types import UserProfile, create_requester +from synapse.types import JsonDict, UserProfile, create_requester from synapse.util import Clock from tests import unittest from tests.storage.test_user_directory import GetUserDirectoryTables -from tests.test_utils import make_awaitable +from tests.test_utils import event_injection, make_awaitable from tests.test_utils.event_injection import inject_member_event from tests.unittest import override_config @@ -1103,3 +1104,185 @@ def test_disabling_room_list(self) -> None: ) self.assertEqual(200, channel.code, channel.result) self.assertTrue(len(channel.json_body["results"]) == 0) + + +class UserDirectoryRemoteProfileTestCase(unittest.HomeserverTestCase): + servlets = [ + login.register_servlets, + synapse.rest.admin.register_servlets, + register.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Re-enables updating the user directory, as that functionality is needed below. + config["update_user_directory_from_worker"] = None + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.alice = self.register_user("alice", "alice123") + self.alice_tok = self.login("alice", "alice123") + self.user_dir_helper = GetUserDirectoryTables(self.store) + self.user_dir_handler = hs.get_user_directory_handler() + self.profile_handler = hs.get_profile_handler() + + # Cancel the startup call: in the steady-state case we can't rely on it anyway. + assert self.user_dir_handler._refresh_remote_profiles_call_later is not None + self.user_dir_handler._refresh_remote_profiles_call_later.cancel() + + def test_public_rooms_have_profiles_collected(self) -> None: + """ + In a public room, member state events are treated as reflecting the user's + real profile and they are accepted. + (The main motivation for accepting this is to prevent having to query + *every* single profile change over federation.) + """ + room_id = self.helper.create_room_as( + self.alice, is_public=True, tok=self.alice_tok + ) + self.get_success( + event_injection.inject_member_event( + self.hs, + room_id, + "@bruce:remote", + "join", + "@bruce:remote", + extra_content={ + "displayname": "Bruce!", + "avatar_url": "mxc://remote/123", + }, + ) + ) + # Sending this event makes the streams move forward after the injection... + self.helper.send(room_id, "Test", tok=self.alice_tok) + self.pump(0.1) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo(display_name="Bruce!", avatar_url="mxc://remote/123"), + ) + + def test_private_rooms_do_not_have_profiles_collected(self) -> None: + """ + In a private room, member state events are not pulled out and used to populate + the user directory. + """ + room_id = self.helper.create_room_as( + self.alice, is_public=False, tok=self.alice_tok + ) + self.get_success( + event_injection.inject_member_event( + self.hs, + room_id, + "@bruce:remote", + "join", + "@bruce:remote", + extra_content={ + "displayname": "super-duper bruce", + "avatar_url": "mxc://remote/456", + }, + ) + ) + # Sending this event makes the streams move forward after the injection... + self.helper.send(room_id, "Test", tok=self.alice_tok) + self.pump(0.1) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertNotIn("@bruce:remote", profiles) + + def test_private_rooms_have_profiles_requested(self) -> None: + """ + When a name changes in a private room, the homeserver instead requests + the user's global profile over federation. + """ + + async def get_remote_profile( + user_id: str, ignore_backoff: bool = True + ) -> JsonDict: + if user_id == "@bruce:remote": + return { + "displayname": "Sir Bruce Bruceson", + "avatar_url": "mxc://remote/789", + } + else: + raise ValueError(f"unable to fetch {user_id}") + + with patch.object(self.profile_handler, "get_profile", get_remote_profile): + # Continue from the earlier test... + self.test_private_rooms_do_not_have_profiles_collected() + + # Advance by a minute + self.reactor.advance(61.0) + + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo( + display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789" + ), + ) + + def test_profile_requests_are_retried(self) -> None: + """ + When we fail to fetch the user's profile over federation, + we try again later. + """ + has_failed_once = False + + async def get_remote_profile( + user_id: str, ignore_backoff: bool = True + ) -> JsonDict: + nonlocal has_failed_once + if user_id == "@bruce:remote": + if not has_failed_once: + has_failed_once = True + raise SynapseError(502, "temporary network problem") + + return { + "displayname": "Sir Bruce Bruceson", + "avatar_url": "mxc://remote/789", + } + else: + raise ValueError(f"unable to fetch {user_id}") + + with patch.object(self.profile_handler, "get_profile", get_remote_profile): + # Continue from the earlier test... + self.test_private_rooms_do_not_have_profiles_collected() + + # Advance by a minute + self.reactor.advance(61.0) + + # The request has already failed once + self.assertTrue(has_failed_once) + + # The profile has yet to be updated. + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertNotIn( + "@bruce:remote", + profiles, + ) + + # Advance by five minutes, after the backoff has finished + self.reactor.advance(301.0) + + # The profile should have been updated now + profiles = self.get_success( + self.user_dir_helper.get_profiles_in_user_directory() + ) + self.assertEqual( + profiles.get("@bruce:remote"), + ProfileInfo( + display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789" + ), + ) From e26901e3b4190b463e0d0bdad2122f4840b6c0af Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Dec 2022 18:28:10 +0000 Subject: [PATCH 10/11] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/14756.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14756.bugfix diff --git a/changelog.d/14756.bugfix b/changelog.d/14756.bugfix new file mode 100644 index 000000000000..12f979e9d041 --- /dev/null +++ b/changelog.d/14756.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change. \ No newline at end of file From 2995db11af58ac198f751cbf98bedc255a7d4686 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 28 Feb 2023 10:35:49 +0000 Subject: [PATCH 11/11] Add comment about 1<<62 --- synapse/handlers/user_directory.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e16e57748ac6..28a92d41d6fc 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -608,6 +608,9 @@ async def _unsafe_refresh_remote_profiles(self) -> None: if not servers_to_refresh: # Do we have any backing-off servers that we should try again # for eventually? + # By setting `now` is a point in the far future, we can ask for + # which server/user is next to be refreshed, even though it is + # not actually refreshable *now*. end_of_time = 1 << 62 backing_off_servers = ( await self.store.get_remote_servers_with_profiles_to_refresh(