Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (split, 4): catch-up loop #8272

Merged
merged 18 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8272.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
125 changes: 121 additions & 4 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast

from prometheus_client import Counter

Expand Down Expand Up @@ -92,6 +92,23 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable.
# False whilst we are known to not be catching up.
# None when it has not been determined if we are catching-up yet (we
# do so lazily).
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = None # type: Optional[bool]
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode, or None if not applicable.
self._catchup_last_skipped = None # type: Optional[int]
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
self._last_successful_stream_ordering = None # type: Optional[int]

# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]

Expand Down Expand Up @@ -138,7 +155,11 @@ def send_pdu(self, pdu: EventBase) -> None:
Args:
pdu: pdu to send
"""
self._pending_pdus.append(pdu)
if not self._catching_up:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is hit while _catching_up is None, then the PDU will get dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the case, but agree the readability is horrible like this -- should be sorted now :)

# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we are to catch up (None)
self._pending_pdus.append(pdu)

self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
Expand Down Expand Up @@ -218,6 +239,13 @@ async def _transaction_transmission_loop(self) -> None:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)

if self._catching_up is None or self._catching_up is True:
# we potentially need to catch-up first
await self._catch_up_transmission_loop()
if self._catching_up:
# not caught up yet
return

pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -351,8 +379,9 @@ async def _transaction_transmission_loop(self) -> None:
if e.retry_interval > 60 * 60 * 1000:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending PDUs and EDUs because otherwise they will
# We drop pending EDUs because otherwise they will
# rack up indefinitely.
# (Dropping PDUs is already performed by `_start_catching_up`.)
# Note that:
# - the EDUs that are being dropped here are those that we can
# afford to drop (specifically, only typing notifications,
Expand All @@ -364,11 +393,12 @@ async def _transaction_transmission_loop(self) -> None:

# dropping read receipts is a bit sad but should be solved
# through another mechanism, because this is all volatile!
self._pending_pdus = []
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}

self._start_catching_up()
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
Expand All @@ -378,6 +408,8 @@ async def _transaction_transmission_loop(self) -> None:
e.code,
e,
)

self._start_catching_up()
except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
Expand All @@ -387,16 +419,92 @@ async def _transaction_transmission_loop(self) -> None:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)

self._start_catching_up()
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)

self._start_catching_up()
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False

async def _catch_up_transmission_loop(self) -> None:
if self._last_successful_stream_ordering is None:
# first catch-up, so get from database
self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
self._destination
)

if self._last_successful_stream_ordering is None:
# if it's still None, then this means we don't have the information
# in our database (oh, the perils of being a new feature).
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# So we can't actually do anything here, and in this case, we don't
# know what to catch up, sadly.
# Trying to catch up right now is futile, so let's stop.
self._catching_up = False
return

# get at most 50 catchup room/PDUs
while True:
event_ids = await self._store.get_catch_up_room_event_ids(
self._destination, self._last_successful_stream_ordering,
)

if not event_ids:
# No more events to catch up on, but we can't ignore the chance
# of a race condition, so we check that no new events have been
# skipped due to us being in catch-up mode

if (
self._catchup_last_skipped is not None
and self._catchup_last_skipped
> self._last_successful_stream_ordering
):
# another event has been skipped because we were in catch-up mode
continue

# we are done catching up!
self._catching_up = False
break

if self._catching_up is None:
# we didn't know if we're in catch-up mode yet but now we know
# we are — so mark us as in catch-up mode and drop the queue.
self._start_catching_up()

# fetch the relevant events from the event store
# - redacted behaviour of REDACT is fine, since we only send metadata
# of redacted events to the destination.
# - don't need to worry about rejected events as we do not actively
# forward received events over federation.
catchup_pdus = await self._store.get_events_as_list(event_ids)
if not catchup_pdus:
raise AssertionError(
"No events retrieved when we asked for %r. "
"This should not happen." % event_ids
)

success = await self._transaction_manager.send_new_transaction(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self._destination, catchup_pdus, []
)

if not success:
return

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
Expand Down Expand Up @@ -457,3 +565,12 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
]

return (edus, stream_id)

def _start_catching_up(self) -> None:
"""
Marks this destination as being in catch-up mode.

This throws away the PDU queue.
"""
self._catching_up = True
self._pending_pdus = []
43 changes: 42 additions & 1 deletion synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Iterable, List, Optional, Tuple

from canonicaljson import encode_canonical_json

Expand Down Expand Up @@ -371,3 +371,44 @@ async def set_destination_last_successful_stream_ordering(
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)

async def get_catch_up_room_event_ids(
self, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
"""
Returns at most 50 event IDs and their corresponding stream_orderings
that correspond to the oldest events that have not yet been sent to
the destination.

Args:
destination: the destination in question
last_successful_stream_ordering: the stream_ordering of the
most-recently successfully-transmitted event to the destination

Returns:
list of event_ids
"""
return await self.db_pool.runInteraction(
"get_catch_up_room_event_ids",
self._get_catch_up_room_event_ids_txn,
destination,
last_successful_stream_ordering,
)

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
JOIN events USING (stream_ordering)
WHERE destination = ?
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT 50
"""
txn.execute(
q, (destination, last_successful_stream_ordering),
)
event_ids = [row[0] for row in txn]
return event_ids
Loading