From bd010d7a079631f34ef9a37aab27195161855686 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 15 Feb 2024 12:22:58 +0000 Subject: [PATCH 1/6] Include destinations we never sent to in catchup When picking destination servers that we need to wake up for a retry, we need to be mindful of destinations that we have *never* successfully sent to. This can manifest either as a null `last_successful_stream_ordering`, or even no row in `destinations` at all. Hence, we need to left-join on `destinations` rather than inner-joining, and we need to treat a null `last_successful_stream_ordering` the same as 0. --- .../storage/databases/main/transactions.py | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index c91c44818f3..bd50c30fab0 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -448,18 +448,35 @@ async def get_catch_up_outstanding_destinations( def _get_catch_up_outstanding_destinations_txn( txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str] ) -> List[str]: + # We're looking for destinations for which there is at least one room where we + # have an event that we have not yet sent to them, indicated by a row in + # `destination_rooms` with a `stream_ordering` older than the + # `last_successful_stream_ordering` (if any) in `destinations`. + # + # Of course, that may produce destinations where we are already busy sending + # the relevant PDU, but in that case, waking up the sender will just be a + # no-op. + # + # From that list, we need to *exclude* destinations which are subject to a + # backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval` + # is in the future). There is also an edge-case where, if the server was + # previously shut down in the middle of the first send attempt to a given + # destination, there may be no row in `destinations` at all; we need to include + # such rows in the output, which means we need to left-join rather than + # inner-join against `destinations`. + q = """ - SELECT DISTINCT destination FROM destinations - INNER JOIN destination_rooms USING (destination) - WHERE - stream_ordering > last_successful_stream_ordering - AND destination > ? - AND ( - retry_last_ts IS NULL OR - retry_last_ts + retry_interval < ? - ) - ORDER BY destination - LIMIT 25 + SELECT DISTINCT destination FROM destination_rooms + LEFT JOIN destinations USING (destination) + WHERE + stream_ordering > COALESCE(last_successful_stream_ordering, 0) + AND destination > ? + AND ( + retry_last_ts IS NULL OR + retry_last_ts + retry_interval < ? + ) + ORDER BY destination + LIMIT 25 """ txn.execute( q, From 3e467d3af8a101ba6b5389b05d3d8fcc43b6a4f2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 15 Feb 2024 12:08:26 +0000 Subject: [PATCH 2/6] Include destinations with to-device messages in catchup When considering which destinations need waking up for a retry, also look for those that have outstanding to-device messages. --- .../storage/databases/main/transactions.py | 84 ++++++++++++++----- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index bd50c30fab0..08e0241f683 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -423,8 +423,11 @@ async def get_catch_up_outstanding_destinations( self, after_destination: Optional[str] ) -> List[str]: """ - Gets at most 25 destinations which have outstanding PDUs to be caught up, - and are not being backed off from + Get a list of destinations we should retry transaction sending to. + + Returns up to 25 destinations which have outstanding PDUs or to-device messages, + and are not subject to a backoff. + Args: after_destination: If provided, all destinations must be lexicographically greater @@ -448,47 +451,86 @@ async def get_catch_up_outstanding_destinations( def _get_catch_up_outstanding_destinations_txn( txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str] ) -> List[str]: - # We're looking for destinations for which there is at least one room where we - # have an event that we have not yet sent to them, indicated by a row in - # `destination_rooms` with a `stream_ordering` older than the - # `last_successful_stream_ordering` (if any) in `destinations`. + # We're looking for destinations which satisfy either of the following + # conditions: + # + # * There is at least one room where we have an event that we have not yet + # sent to them, indicated by a row in `destination_rooms` with a + # `stream_ordering` older than the `last_successful_stream_ordering` + # (if any) in `destinations`, or: + # + # * There is at least one to-device message outstanding for the destination, + # indicated by a row in `device_federation_outbox`. # # Of course, that may produce destinations where we are already busy sending - # the relevant PDU, but in that case, waking up the sender will just be a - # no-op. + # the relevant PDU or to-device message, but in that case, waking up the + # sender will just be a no-op. # - # From that list, we need to *exclude* destinations which are subject to a - # backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval` + # From those two lists, we need to *exclude* destinations which are subject + # to a backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval` # is in the future). There is also an edge-case where, if the server was # previously shut down in the middle of the first send attempt to a given # destination, there may be no row in `destinations` at all; we need to include # such rows in the output, which means we need to left-join rather than # inner-join against `destinations`. + # + # The two sources of destinations (`destination_rooms` and + # `device_federation_outbox`) are queried separately and UNIONed; but the list + # may be very long, and we don't want to return all the rows at once. We + # therefore sort the output and just return the first 25 rows. Obviously that + # means there is no point in either of the inner queries returning more than + # 25 results, since any further results are certain to be dropped by the outer + # LIMIT. In order to help the query-optimiser understand that, we *also* sort + # and limit the *inner* queries, hence we express them as CTEs rather than + # sub-queries. + # + # (NB: we make sure to do the top-level sort and limit on the database, rather + # than making two queries and combining the result in Python. We could otherwise + # suffer from slight differences in sort order between Python and the database, + # which would make the `after_destination` condition unreliable.) q = """ + WITH pdu_destinations AS ( SELECT DISTINCT destination FROM destination_rooms LEFT JOIN destinations USING (destination) WHERE - stream_ordering > COALESCE(last_successful_stream_ordering, 0) - AND destination > ? + destination > ? + AND destination_rooms.stream_ordering > COALESCE(destinations.last_successful_stream_ordering, 0) AND ( - retry_last_ts IS NULL OR - retry_last_ts + retry_interval < ? + destinations.retry_last_ts IS NULL OR + destinations.retry_last_ts + destinations.retry_interval < ? ) ORDER BY destination LIMIT 25 + ), to_device_destinations AS ( + SELECT DISTINCT destination FROM device_federation_outbox + LEFT JOIN destinations USING (destination) + WHERE + destination > ? + AND ( + destinations.retry_last_ts IS NULL OR + destinations.retry_last_ts + destinations.retry_interval < ? + ) + ORDER BY destination + LIMIT 25 + ) + + SELECT destination FROM pdu_destinations + UNION SELECT destination FROM to_device_destinations + ORDER BY destination + LIMIT 25 """ + + # everything is lexicographically greater than "" so this gives + # us the first batch of up to 25. + after_destination = after_destination or "" + txn.execute( q, - ( - # everything is lexicographically greater than "" so this gives - # us the first batch of up to 25. - after_destination or "", - now_time_ms, - ), + (after_destination, now_time_ms, after_destination, now_time_ms), ) - destinations = [row[0] for row in txn] + return destinations async def get_destinations_paginate( From ce234c2ad059801bd6a4e026bede2a9022ad1267 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 15 Feb 2024 12:16:23 +0000 Subject: [PATCH 3/6] Kick off the destination wakeup job immediately at startup We don't really want people to have to wait 60 seconds for their to-device messages. --- synapse/federation/sender/__init__.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 9ed6fc98b51..6e64774f329 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -192,10 +192,9 @@ ) # Time (in s) to wait before trying to wake up destinations that have -# catch-up outstanding. This will also be the delay applied at startup -# before trying the same. +# catch-up outstanding. # Please note that rate limiting still applies, so while the loop is -# executed every X seconds the destinations may not be wake up because +# executed every X seconds the destinations may not be woken up because # they are being rate limited following previous attempt failures. WAKEUP_RETRY_PERIOD_SEC = 60 @@ -428,6 +427,9 @@ def __init__(self, hs: "HomeServer"): / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second ) + self._external_cache = hs.get_external_cache() + self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) + # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call( run_as_background_process, @@ -435,10 +437,11 @@ def __init__(self, hs: "HomeServer"): "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) - - self._external_cache = hs.get_external_cache() - - self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) + # Also kick off a run of the waker process at startup. + run_as_background_process( + "wake_destinations_needing_catchup", + self._wake_destinations_needing_catchup, + ) def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination From edc8155f3e18569981334d3ef97dc9837ca0e1c1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 15 Feb 2024 12:31:42 +0000 Subject: [PATCH 4/6] changelog --- changelog.d/16925.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16925.bugfix diff --git a/changelog.d/16925.bugfix b/changelog.d/16925.bugfix new file mode 100644 index 00000000000..781cad4b5be --- /dev/null +++ b/changelog.d/16925.bugfix @@ -0,0 +1 @@ +Fix a bug which meant that, under certain circumstances, we might never retry sending events or to-device messages over federation after a failure. From b7908ac9758af0fb98123b6087e8c1900ff5db72 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 14 Mar 2024 18:37:33 +0000 Subject: [PATCH 5/6] Add `now` param to `looping_call` --- synapse/federation/sender/__init__.py | 6 +----- synapse/util/__init__.py | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 6e64774f329..7987bfca834 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -436,11 +436,7 @@ def __init__(self, hs: "HomeServer"): WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, - ) - # Also kick off a run of the waker process at startup. - run_as_background_process( - "wake_destinations_needing_catchup", - self._wake_destinations_needing_catchup, + now=True, ) def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 9e374354ec3..a44279c523d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -117,11 +117,17 @@ def time_msec(self) -> int: return int(self.time() * 1000) def looping_call( - self, f: Callable[P, object], msec: float, *args: P.args, **kwargs: P.kwargs + self, + f: Callable[P, object], + msec: float, + *args: P.args, + now: bool = False, + **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. - Waits `msec` initially before calling `f` for the first time. + Unless `now` is `True`, waits `msec` initially before calling `f` for the first + time. If the function given to `looping_call` returns an awaitable/deferred, the next call isn't scheduled until after the returned awaitable has finished. We get @@ -134,12 +140,14 @@ def looping_call( Args: f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. - *args: Postional arguments to pass to function. + *args: Positional arguments to pass to function. + now: (Must be specified as a named arg.) If `True`, the first call to the + function is scheduled immediately. **kwargs: Key arguments to pass to function. """ call = task.LoopingCall(f, *args, **kwargs) call.clock = self._reactor - d = call.start(msec / 1000.0, now=False) + d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) return call From 6c73ba27333b029f4b3105b721098550a2920ecf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Mar 2024 10:20:31 +0000 Subject: [PATCH 6/6] Fix types by using a different function --- synapse/federation/sender/__init__.py | 3 +- synapse/util/__init__.py | 40 +++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 7987bfca834..18884808815 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -431,12 +431,11 @@ def __init__(self, hs: "HomeServer"): self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) # Regularly wake up destinations that have outstanding PDUs to be caught up - self.clock.looping_call( + self.clock.looping_call_now( run_as_background_process, WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, - now=True, ) def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index a44279c523d..e0d876e84bf 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -121,13 +121,11 @@ def looping_call( f: Callable[P, object], msec: float, *args: P.args, - now: bool = False, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. - Unless `now` is `True`, waits `msec` initially before calling `f` for the first - time. + Waits `msec` initially before calling `f` for the first time. If the function given to `looping_call` returns an awaitable/deferred, the next call isn't scheduled until after the returned awaitable has finished. We get @@ -141,10 +139,42 @@ def looping_call( f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. - now: (Must be specified as a named arg.) If `True`, the first call to the - function is scheduled immediately. **kwargs: Key arguments to pass to function. """ + return self._looping_call_common(f, msec, False, *args, **kwargs) + + def looping_call_now( + self, + f: Callable[P, object], + msec: float, + *args: P.args, + **kwargs: P.kwargs, + ) -> LoopingCall: + """Call a function immediately, and then repeatedly thereafter. + + As with `looping_call`: subsequent calls are not scheduled until after the + the Awaitable returned by a previous call has finished. + + Also as with `looping_call`: the function is called with no logcontext and + you probably want to wrap it in `run_as_background_process`. + + Args: + f: The function to call repeatedly. + msec: How long to wait between calls in milliseconds. + *args: Positional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ + return self._looping_call_common(f, msec, True, *args, **kwargs) + + def _looping_call_common( + self, + f: Callable[P, object], + msec: float, + now: bool, + *args: P.args, + **kwargs: P.kwargs, + ) -> LoopingCall: + """Common functionality for `looping_call` and `looping_call_now`""" call = task.LoopingCall(f, *args, **kwargs) call.clock = self._reactor d = call.start(msec / 1000.0, now=now)