Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that pending to-device events are sent over federation at startup #16925

Merged
merged 8 commits into from
Mar 22, 2024
1 change: 1 addition & 0 deletions changelog.d/16925.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which meant that, under certain circumstances, we might never retry sending events or to-device messages over federation after a failure.
14 changes: 6 additions & 8 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. This will also be the delay applied at startup
# before trying the same.
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be wake up because
# executed every X seconds the destinations may not be woken up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60

Expand Down Expand Up @@ -428,18 +427,17 @@ def __init__(self, hs: "HomeServer"):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)

self._external_cache = hs.get_external_cache()
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call(
self.clock.looping_call_now(
run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)

self._external_cache = hs.get_external_cache()

self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination

Expand Down
99 changes: 79 additions & 20 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,11 @@ async def get_catch_up_outstanding_destinations(
self, after_destination: Optional[str]
) -> List[str]:
"""
Gets at most 25 destinations which have outstanding PDUs to be caught up,
and are not being backed off from
Get a list of destinations we should retry transaction sending to.

Returns up to 25 destinations which have outstanding PDUs or to-device messages,
and are not subject to a backoff.

Args:
after_destination:
If provided, all destinations must be lexicographically greater
Expand All @@ -448,30 +451,86 @@ async def get_catch_up_outstanding_destinations(
def _get_catch_up_outstanding_destinations_txn(
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
# We're looking for destinations which satisfy either of the following
# conditions:
#
# * There is at least one room where we have an event that we have not yet
# sent to them, indicated by a row in `destination_rooms` with a
# `stream_ordering` older than the `last_successful_stream_ordering`
# (if any) in `destinations`, or:
#
# * There is at least one to-device message outstanding for the destination,
# indicated by a row in `device_federation_outbox`.
#
# Of course, that may produce destinations where we are already busy sending
# the relevant PDU or to-device message, but in that case, waking up the
# sender will just be a no-op.
#
# From those two lists, we need to *exclude* destinations which are subject
# to a backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval`
# is in the future). There is also an edge-case where, if the server was
# previously shut down in the middle of the first send attempt to a given
# destination, there may be no row in `destinations` at all; we need to include
# such rows in the output, which means we need to left-join rather than
# inner-join against `destinations`.
#
# The two sources of destinations (`destination_rooms` and
# `device_federation_outbox`) are queried separately and UNIONed; but the list
# may be very long, and we don't want to return all the rows at once. We
# therefore sort the output and just return the first 25 rows. Obviously that
# means there is no point in either of the inner queries returning more than
# 25 results, since any further results are certain to be dropped by the outer
# LIMIT. In order to help the query-optimiser understand that, we *also* sort
# and limit the *inner* queries, hence we express them as CTEs rather than
# sub-queries.
#
# (NB: we make sure to do the top-level sort and limit on the database, rather
# than making two queries and combining the result in Python. We could otherwise
# suffer from slight differences in sort order between Python and the database,
# which would make the `after_destination` condition unreliable.)

q = """
SELECT DISTINCT destination FROM destinations
INNER JOIN destination_rooms USING (destination)
WHERE
stream_ordering > last_successful_stream_ordering
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
)
ORDER BY destination
LIMIT 25
WITH pdu_destinations AS (
SELECT DISTINCT destination FROM destination_rooms
LEFT JOIN destinations USING (destination)
WHERE
destination > ?
AND destination_rooms.stream_ordering > COALESCE(destinations.last_successful_stream_ordering, 0)
AND (
destinations.retry_last_ts IS NULL OR
destinations.retry_last_ts + destinations.retry_interval < ?
)
ORDER BY destination
LIMIT 25
), to_device_destinations AS (
SELECT DISTINCT destination FROM device_federation_outbox
LEFT JOIN destinations USING (destination)
WHERE
destination > ?
AND (
destinations.retry_last_ts IS NULL OR
destinations.retry_last_ts + destinations.retry_interval < ?
)
ORDER BY destination
LIMIT 25
)

SELECT destination FROM pdu_destinations
UNION SELECT destination FROM to_device_destinations
ORDER BY destination
LIMIT 25
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""

# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination = after_destination or ""

txn.execute(
q,
(
# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination or "",
now_time_ms,
),
(after_destination, now_time_ms, after_destination, now_time_ms),
)

destinations = [row[0] for row in txn]

return destinations

async def get_destinations_paginate(
Expand Down
44 changes: 41 additions & 3 deletions synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def time_msec(self) -> int:
return int(self.time() * 1000)

def looping_call(
self, f: Callable[P, object], msec: float, *args: P.args, **kwargs: P.kwargs
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function repeatedly.

Expand All @@ -134,12 +138,46 @@ def looping_call(
Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Postional arguments to pass to function.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, False, *args, **kwargs)

def looping_call_now(
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function immediately, and then repeatedly thereafter.

As with `looping_call`: subsequent calls are not scheduled until after the
the Awaitable returned by a previous call has finished.

Also as with `looping_call`: the function is called with no logcontext and
you probably want to wrap it in `run_as_background_process`.

Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, True, *args, **kwargs)

def _looping_call_common(
self,
f: Callable[P, object],
msec: float,
now: bool,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Common functionality for `looping_call` and `looping_call_now`"""
call = task.LoopingCall(f, *args, **kwargs)
call.clock = self._reactor
d = call.start(msec / 1000.0, now=False)
d = call.start(msec / 1000.0, now=now)
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
return call

Expand Down
Loading