From df8b91ed2bba4995c59a5b067e3b252ab90c9a5e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 28 Sep 2022 15:26:16 -0500 Subject: [PATCH] Limit and filter the number of backfill points to get from the database (#13879) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is no need to grab thousands of backfill points when we only need 5 to make the `/backfill` request with. We need to grab a few extra in case the first few aren't visible in the history. Previously, we grabbed thousands of backfill points from the database, then sorted and filtered them in the app. Fetching the 4.6k backfill points for `#matrix:matrix.org` from the database takes ~50ms - ~570ms so it's not like this saves a lot of time 🤷. But it might save us more time now that `get_backfill_points_in_room`/`get_insertion_event_backward_extremities_in_room` are more complicated after https://github.com/matrix-org/synapse/pull/13635 This PR moves the filtering and limiting to the SQL query so we just have less data to work with in the first place. Part of https://github.com/matrix-org/synapse/issues/13356 --- changelog.d/13879.misc | 1 + synapse/handlers/federation.py | 109 ++++++++++-------- .../databases/main/event_federation.py | 90 +++++++++++++-- tests/storage/test_event_federation.py | 80 +++++++++---- 4 files changed, 198 insertions(+), 82 deletions(-) create mode 100644 changelog.d/13879.misc diff --git a/changelog.d/13879.misc b/changelog.d/13879.misc new file mode 100644 index 000000000000..3cc2a2420fa0 --- /dev/null +++ b/changelog.d/13879.misc @@ -0,0 +1 @@ +Only pull relevant backfill points from the database based on the current depth and limit (instead of all) every time we want to `/backfill`. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 360ab6fee290..500c1c16d006 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -38,7 +38,7 @@ from unpaddedbase64 import decode_base64 from synapse import event_auth -from synapse.api.constants import EventContentFields, EventTypes, Membership +from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership from synapse.api.errors import ( AuthError, CodeMessageException, @@ -211,7 +211,7 @@ async def _maybe_backfill_inner( current_depth: int, limit: int, *, - processing_start_time: int, + processing_start_time: Optional[int], ) -> bool: """ Checks whether the `current_depth` is at or approaching any backfill @@ -223,12 +223,23 @@ async def _maybe_backfill_inner( room_id: The room to backfill in. current_depth: The depth to check at for any upcoming backfill points. limit: The max number of events to request from the remote federated server. - processing_start_time: The time when `maybe_backfill` started - processing. Only used for timing. + processing_start_time: The time when `maybe_backfill` started processing. + Only used for timing. If `None`, no timing observation will be made. """ backwards_extremities = [ _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) - for event_id, depth in await self.store.get_backfill_points_in_room(room_id) + for event_id, depth in await self.store.get_backfill_points_in_room( + room_id=room_id, + current_depth=current_depth, + # We only need to end up with 5 extremities combined with the + # insertion event extremities to make the `/backfill` request + # but fetch an order of magnitude more to make sure there is + # enough even after we filter them by whether visible in the + # history. This isn't fool-proof as all backfill points within + # our limit could be filtered out but seems like a good amount + # to try with at least. + limit=50, + ) ] insertion_events_to_be_backfilled: List[_BackfillPoint] = [] @@ -236,7 +247,12 @@ async def _maybe_backfill_inner( insertion_events_to_be_backfilled = [ _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT) for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room( - room_id + room_id=room_id, + current_depth=current_depth, + # We only need to end up with 5 extremities combined with + # the backfill points to make the `/backfill` request ... + # (see the other comment above for more context). + limit=50, ) ] logger.debug( @@ -245,10 +261,6 @@ async def _maybe_backfill_inner( insertion_events_to_be_backfilled, ) - if not backwards_extremities and not insertion_events_to_be_backfilled: - logger.debug("Not backfilling as no extremeties found.") - return False - # we now have a list of potential places to backpaginate from. We prefer to # start with the most recent (ie, max depth), so let's sort the list. sorted_backfill_points: List[_BackfillPoint] = sorted( @@ -269,6 +281,33 @@ async def _maybe_backfill_inner( sorted_backfill_points, ) + # If we have no backfill points lower than the `current_depth` then + # either we can a) bail or b) still attempt to backfill. We opt to try + # backfilling anyway just in case we do get relevant events. + if not sorted_backfill_points and current_depth != MAX_DEPTH: + logger.debug( + "_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points." + ) + return await self._maybe_backfill_inner( + room_id=room_id, + # We use `MAX_DEPTH` so that we find all backfill points next + # time (all events are below the `MAX_DEPTH`) + current_depth=MAX_DEPTH, + limit=limit, + # We don't want to start another timing observation from this + # nested recursive call. The top-most call can record the time + # overall otherwise the smaller one will throw off the results. + processing_start_time=None, + ) + + # Even after recursing with `MAX_DEPTH`, we didn't find any + # backward extremities to backfill from. + if not sorted_backfill_points: + logger.debug( + "_maybe_backfill_inner: Not backfilling as no backward extremeties found." + ) + return False + # If we're approaching an extremity we trigger a backfill, otherwise we # no-op. # @@ -278,47 +317,16 @@ async def _maybe_backfill_inner( # chose more than one times the limit in case of failure, but choosing a # much larger factor will result in triggering a backfill request much # earlier than necessary. - # - # XXX: shouldn't we do this *after* the filter by depth below? Again, we don't - # care about events that have happened after our current position. - # - max_depth = sorted_backfill_points[0].depth - if current_depth - 2 * limit > max_depth: + max_depth_of_backfill_points = sorted_backfill_points[0].depth + if current_depth - 2 * limit > max_depth_of_backfill_points: logger.debug( "Not backfilling as we don't need to. %d < %d - 2 * %d", - max_depth, + max_depth_of_backfill_points, current_depth, limit, ) return False - # We ignore extremities that have a greater depth than our current depth - # as: - # 1. we don't really care about getting events that have happened - # after our current position; and - # 2. we have likely previously tried and failed to backfill from that - # extremity, so to avoid getting "stuck" requesting the same - # backfill repeatedly we drop those extremities. - # - # However, we need to check that the filtered extremities are non-empty. - # If they are empty then either we can a) bail or b) still attempt to - # backfill. We opt to try backfilling anyway just in case we do get - # relevant events. - # - filtered_sorted_backfill_points = [ - t for t in sorted_backfill_points if t.depth <= current_depth - ] - if filtered_sorted_backfill_points: - logger.debug( - "_maybe_backfill_inner: backfill points before current depth: %s", - filtered_sorted_backfill_points, - ) - sorted_backfill_points = filtered_sorted_backfill_points - else: - logger.debug( - "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway." - ) - # For performance's sake, we only want to paginate from a particular extremity # if we can actually see the events we'll get. Otherwise, we'd just spend a lot # of resources to get redacted events. We check each extremity in turn and @@ -452,10 +460,15 @@ async def try_backfill(domains: Collection[str]) -> bool: return False - processing_end_time = self.clock.time_msec() - backfill_processing_before_timer.observe( - (processing_end_time - processing_start_time) / 1000 - ) + # If we have the `processing_start_time`, then we can make an + # observation. We wouldn't have the `processing_start_time` in the case + # where `_maybe_backfill_inner` is recursively called to find any + # backfill points regardless of `current_depth`. + if processing_start_time is not None: + processing_end_time = self.clock.time_msec() + backfill_processing_before_timer.observe( + (processing_end_time - processing_start_time) / 1000 + ) success = await try_backfill(likely_domains) if success: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3251fca6fbf2..17f2fd4458ab 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -726,17 +726,35 @@ def _get_auth_chain_difference_txn( async def get_backfill_points_in_room( self, room_id: str, + current_depth: int, + limit: int, ) -> List[Tuple[str, int]]: """ - Gets the oldest events(backwards extremities) in the room along with the - approximate depth. Sorted by depth, highest to lowest (descending). + Get the backward extremities to backfill from in the room along with the + approximate depth. + + Only returns events that are at a depth lower than or + equal to the `current_depth`. Sorted by depth, highest to lowest (descending) + so the closest events to the `current_depth` are first in the list. + + We ignore extremities that are newer than the user's current scroll position + (ie, those with depth greater than `current_depth`) as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that extremity, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those extremities. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the user's current scrollback position + limit: The max number of backfill points to return Returns: List of (event_id, depth) tuples. Sorted by depth, highest to lowest - (descending) + (descending) so the closest events to the `current_depth` are first + in the list. """ def get_backfill_points_in_room_txn( @@ -784,6 +802,18 @@ def get_backfill_points_in_room_txn( * necessarily safe to assume that it will have been completed. */ AND edge.is_state is ? /* False */ + /** + * We only want backwards extremities that are older than or at + * the same position of the given `current_depth` (where older + * means less than the given depth) because we're looking backwards + * from the `current_depth` when backfilling. + * + * current_depth (ignore events that come after this, ignore 2-4) + * | + * ▼ + * [0]<--[1]<--[2]<--[3]<--[4] + */ + AND event.depth <= ? /* current_depth */ /** * Exponential back-off (up to the upper bound) so we don't retry the * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. @@ -798,11 +828,13 @@ def get_backfill_points_in_room_txn( OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) ) /** - * Sort from highest to the lowest depth. Then tie-break on - * alphabetical order of the event_ids so we get a consistent - * ordering which is nice when asserting things in tests. + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. */ ORDER BY event.depth DESC, backward_extrem.event_id DESC + LIMIT ? """ if isinstance(self.database_engine, PostgresEngine): @@ -817,9 +849,11 @@ def get_backfill_points_in_room_txn( ( room_id, False, + current_depth, self._clock.time_msec(), 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + limit, ), ) @@ -835,18 +869,34 @@ def get_backfill_points_in_room_txn( async def get_insertion_event_backward_extremities_in_room( self, room_id: str, + current_depth: int, + limit: int, ) -> List[Tuple[str, int]]: """ Get the insertion events we know about that we haven't backfilled yet - along with the approximate depth. Sorted by depth, highest to lowest - (descending). + along with the approximate depth. Only returns insertion events that are + at a depth lower than or equal to the `current_depth`. Sorted by depth, + highest to lowest (descending) so the closest events to the + `current_depth` are first in the list. + + We ignore insertion events that are newer than the user's current scroll + position (ie, those with depth greater than `current_depth`) as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that insertion event, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those insertion event. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the user's current scrollback position + limit: The max number of insertion event extremities to return Returns: List of (event_id, depth) tuples. Sorted by depth, highest to lowest - (descending) + (descending) so the closest events to the `current_depth` are first + in the list. """ def get_insertion_event_backward_extremities_in_room_txn( @@ -869,6 +919,18 @@ def get_insertion_event_backward_extremities_in_room_txn( AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id WHERE insertion_event_extremity.room_id = ? + /** + * We only want extremities that are older than or at + * the same position of the given `current_depth` (where older + * means less than the given depth) because we're looking backwards + * from the `current_depth` when backfilling. + * + * current_depth (ignore events that come after this, ignore 2-4) + * | + * ▼ + * [0]<--[1]<--[2]<--[3]<--[4] + */ + AND event.depth <= ? /* current_depth */ /** * Exponential back-off (up to the upper bound) so we don't retry the * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc @@ -883,11 +945,13 @@ def get_insertion_event_backward_extremities_in_room_txn( OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) ) /** - * Sort from highest to the lowest depth. Then tie-break on - * alphabetical order of the event_ids so we get a consistent - * ordering which is nice when asserting things in tests. + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. */ ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC + LIMIT ? """ if isinstance(self.database_engine, PostgresEngine): @@ -901,9 +965,11 @@ def get_insertion_event_backward_extremities_in_room_txn( sql % (least_function,), ( room_id, + current_depth, self._clock.time_msec(), 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + limit, ), ) return cast(List[Tuple[str, int]], txn.fetchall()) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 85739c464ec7..398f338b6624 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -754,19 +754,31 @@ def populate_db(txn: LoggingTransaction): def test_get_backfill_points_in_room(self): """ - Test to make sure we get some backfill points + Test to make sure only backfill points that are older and come before + the `current_depth` are returned. """ setup_info = self._setup_room_for_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map + # Try at "B" backfill_points = self.get_success( - self.store.get_backfill_points_in_room(room_id) + self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertListEqual( backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"] ) + # Try at "A" + backfill_points = self.get_success( + self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + # Event "2" has a depth of 2 but is not included here because we only + # know the approximate depth of 5 from our event "3". + self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"]) + def test_get_backfill_points_in_room_excludes_events_we_have_attempted( self, ): @@ -776,6 +788,7 @@ def test_get_backfill_points_in_room_excludes_events_we_have_attempted( """ setup_info = self._setup_room_for_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map # Record some attempts to backfill these events which will make # `get_backfill_points_in_room` exclude them because we @@ -795,8 +808,9 @@ def test_get_backfill_points_in_room_excludes_events_we_have_attempted( # No time has passed since we attempted to backfill ^ + # Try at "B" backfill_points = self.get_success( - self.store.get_backfill_points_in_room(room_id) + self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] # Only the backfill points that we didn't record earlier exist here. @@ -812,6 +826,7 @@ def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duratio """ setup_info = self._setup_room_for_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map # Record some attempts to backfill these events which will make # `get_backfill_points_in_room` exclude them because we @@ -839,26 +854,24 @@ def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duratio # visible regardless. self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) - # Make sure that "b1" is not in the list because we've + # Try at "A" and make sure that "b1" is not in the list because we've # already attempted many times backfill_points = self.get_success( - self.store.get_backfill_points_in_room(room_id) + self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"]) + self.assertListEqual(backfill_event_ids, ["b3", "b2"]) # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and # see if we can now backfill it self.reactor.advance(datetime.timedelta(hours=20).total_seconds()) - # Try again after we advanced enough time and we should see "b3" again + # Try at "A" again after we advanced enough time and we should see "b3" again backfill_points = self.get_success( - self.store.get_backfill_points_in_room(room_id) + self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertListEqual( - backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"] - ) + self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"]) def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo: """ @@ -938,19 +951,35 @@ def populate_db(txn: LoggingTransaction): def test_get_insertion_event_backward_extremities_in_room(self): """ - Test to make sure insertion event backward extremities are returned. + Test to make sure only insertion event backward extremities that are + older and come before the `current_depth` are returned. """ setup_info = self._setup_room_for_insertion_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map + # Try at "insertion_eventB" backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room(room_id) + self.store.get_insertion_event_backward_extremities_in_room( + room_id, depth_map["insertion_eventB"], limit=100 + ) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertListEqual( backfill_event_ids, ["insertion_eventB", "insertion_eventA"] ) + # Try at "insertion_eventA" + backfill_points = self.get_success( + self.store.get_insertion_event_backward_extremities_in_room( + room_id, depth_map["insertion_eventA"], limit=100 + ) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + # Event "2" has a depth of 2 but is not included here because we only + # know the approximate depth of 5 from our event "3". + self.assertListEqual(backfill_event_ids, ["insertion_eventA"]) + def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted( self, ): @@ -961,6 +990,7 @@ def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_hav """ setup_info = self._setup_room_for_insertion_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map # Record some attempts to backfill these events which will make # `get_insertion_event_backward_extremities_in_room` exclude them @@ -973,8 +1003,11 @@ def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_hav # No time has passed since we attempted to backfill ^ + # Try at "insertion_eventB" backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room(room_id) + self.store.get_insertion_event_backward_extremities_in_room( + room_id, depth_map["insertion_eventB"], limit=100 + ) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] # Only the backfill points that we didn't record earlier exist here. @@ -991,6 +1024,7 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ """ setup_info = self._setup_room_for_insertion_backfill_tests() room_id = setup_info.room_id + depth_map = setup_info.depth_map # Record some attempts to backfill these events which will make # `get_backfill_points_in_room` exclude them because we @@ -1027,13 +1061,15 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ # because we haven't waited long enough for this many attempts. self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) - # Make sure that "insertion_eventA" is not in the list because we've - # already attempted many times + # Try at "insertion_eventA" and make sure that "insertion_eventA" is not + # in the list because we've already attempted many times backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room(room_id) + self.store.get_insertion_event_backward_extremities_in_room( + room_id, depth_map["insertion_eventA"], limit=100 + ) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertListEqual(backfill_event_ids, ["insertion_eventB"]) + self.assertListEqual(backfill_event_ids, []) # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and # see if we can now backfill it @@ -1042,12 +1078,12 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ # Try at "insertion_eventA" again after we advanced enough time and we # should see "insertion_eventA" again backfill_points = self.get_success( - self.store.get_insertion_event_backward_extremities_in_room(room_id) + self.store.get_insertion_event_backward_extremities_in_room( + room_id, depth_map["insertion_eventA"], limit=100 + ) ) backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] - self.assertListEqual( - backfill_event_ids, ["insertion_eventB", "insertion_eventA"] - ) + self.assertListEqual(backfill_event_ids, ["insertion_eventA"]) @attr.s