Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that pending to-device events are sent over federation at startup #16925

Merged
merged 8 commits into from
Mar 22, 2024
1 change: 1 addition & 0 deletions changelog.d/16925.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug which meant that, under certain circumstances, we might never retry sending events or to-device messages over federation after a failure.
17 changes: 10 additions & 7 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. This will also be the delay applied at startup
# before trying the same.
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be wake up because
# executed every X seconds the destinations may not be woken up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60

Expand Down Expand Up @@ -428,17 +427,21 @@ def __init__(self, hs: "HomeServer"):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)

self._external_cache = hs.get_external_cache()
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call(
run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)

self._external_cache = hs.get_external_cache()

self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
# Also kick off a run of the waker process at startup.
run_as_background_process(
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a now param to looping_call (as the underlying twisted thing supports it), that way if the initial run takes a long time a second run won't get started by the looping call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum. Turns out this screws up the type checker.

In order to add the new parameter without breaking existing code, it needs to be a named parameter (because excess positional parameters are passed to the wrapped function). However, it turns out that ParamSpec is incompatible with additional keyword args (https://peps.python.org/pep-0612/#id2 actually mentions this: "Note that this also why we have to reject signatures of the form (*args: P.args, s: str, **kwargs: P.kwargs).)

So, three options:

  • Add a positional parameter to looping_call, and update all existing calls to looping_call (55 of them, according to my IDE).
  • Add a new method looping_call2 (or something) which takes a positional now parameter. Use it here, and deprecate looping_call.
  • Add a new method looping_call_now, which is exactly the same as looping_call except for the obvious. (The implementation of this will probably involve a private equivalent to looping_call2, shared between looping_call_now and looping_call).

My instinct is number 3, but happy with whatever you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I took an executive decision.


def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Expand Down
99 changes: 79 additions & 20 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,11 @@ async def get_catch_up_outstanding_destinations(
self, after_destination: Optional[str]
) -> List[str]:
"""
Gets at most 25 destinations which have outstanding PDUs to be caught up,
and are not being backed off from
Get a list of destinations we should retry transaction sending to.

Returns up to 25 destinations which have outstanding PDUs or to-device messages,
and are not subject to a backoff.

Args:
after_destination:
If provided, all destinations must be lexicographically greater
Expand All @@ -448,30 +451,86 @@ async def get_catch_up_outstanding_destinations(
def _get_catch_up_outstanding_destinations_txn(
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
# We're looking for destinations which satisfy either of the following
# conditions:
#
# * There is at least one room where we have an event that we have not yet
# sent to them, indicated by a row in `destination_rooms` with a
# `stream_ordering` older than the `last_successful_stream_ordering`
# (if any) in `destinations`, or:
#
# * There is at least one to-device message outstanding for the destination,
# indicated by a row in `device_federation_outbox`.
#
# Of course, that may produce destinations where we are already busy sending
# the relevant PDU or to-device message, but in that case, waking up the
# sender will just be a no-op.
#
# From those two lists, we need to *exclude* destinations which are subject
# to a backoff (ie, where `destinations.retry_last_ts + destinations.retry_interval`
# is in the future). There is also an edge-case where, if the server was
# previously shut down in the middle of the first send attempt to a given
# destination, there may be no row in `destinations` at all; we need to include
# such rows in the output, which means we need to left-join rather than
# inner-join against `destinations`.
#
# The two sources of destinations (`destination_rooms` and
# `device_federation_outbox`) are queried separately and UNIONed; but the list
# may be very long, and we don't want to return all the rows at once. We
# therefore sort the output and just return the first 25 rows. Obviously that
# means there is no point in either of the inner queries returning more than
# 25 results, since any further results are certain to be dropped by the outer
# LIMIT. In order to help the query-optimiser understand that, we *also* sort
# and limit the *inner* queries, hence we express them as CTEs rather than
# sub-queries.
#
# (NB: we make sure to do the top-level sort and limit on the database, rather
# than making two queries and combining the result in Python. We could otherwise
# suffer from slight differences in sort order between Python and the database,
# which would make the `after_destination` condition unreliable.)

q = """
SELECT DISTINCT destination FROM destinations
INNER JOIN destination_rooms USING (destination)
WHERE
stream_ordering > last_successful_stream_ordering
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
)
ORDER BY destination
LIMIT 25
WITH pdu_destinations AS (
SELECT DISTINCT destination FROM destination_rooms
LEFT JOIN destinations USING (destination)
WHERE
destination > ?
AND destination_rooms.stream_ordering > COALESCE(destinations.last_successful_stream_ordering, 0)
AND (
destinations.retry_last_ts IS NULL OR
destinations.retry_last_ts + destinations.retry_interval < ?
)
ORDER BY destination
LIMIT 25
), to_device_destinations AS (
SELECT DISTINCT destination FROM device_federation_outbox
LEFT JOIN destinations USING (destination)
WHERE
destination > ?
AND (
destinations.retry_last_ts IS NULL OR
destinations.retry_last_ts + destinations.retry_interval < ?
)
ORDER BY destination
LIMIT 25
)

SELECT destination FROM pdu_destinations
UNION SELECT destination FROM to_device_destinations
ORDER BY destination
LIMIT 25
richvdh marked this conversation as resolved.
Show resolved Hide resolved
"""

# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination = after_destination or ""

txn.execute(
q,
(
# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination or "",
now_time_ms,
),
(after_destination, now_time_ms, after_destination, now_time_ms),
)

destinations = [row[0] for row in txn]

return destinations

async def get_destinations_paginate(
Expand Down
Loading