diff --git a/changelog.d/8272.bugfix b/changelog.d/8272.bugfix new file mode 100644 index 000000000000..532d0e22fefb --- /dev/null +++ b/changelog.d/8272.bugfix @@ -0,0 +1 @@ +Fix messages over federation being lost until an event is sent into the same room. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 9f0852b4a2e2..2657767fd153 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast from prometheus_client import Counter @@ -92,6 +92,21 @@ def __init__( self._destination = destination self.transmission_loop_running = False + # True whilst we are sending events that the remote homeserver missed + # because it was unreachable. We start in this state so we can perform + # catch-up at startup. + # New events will only be sent once this is finished, at which point + # _catching_up is flipped to False. + self._catching_up = True # type: bool + + # The stream_ordering of the most recent PDU that was discarded due to + # being in catch-up mode. + self._catchup_last_skipped = 0 # type: int + + # Cache of the last successfully-transmitted stream ordering for this + # destination (we are the only updater so this is safe) + self._last_successful_stream_ordering = None # type: Optional[int] + # a list of pending PDUs self._pending_pdus = [] # type: List[EventBase] @@ -138,7 +153,13 @@ def send_pdu(self, pdu: EventBase) -> None: Args: pdu: pdu to send """ - self._pending_pdus.append(pdu) + if not self._catching_up or self._last_successful_stream_ordering is None: + # only enqueue the PDU if we are not catching up (False) or do not + # yet know if we have anything to catch up (None) + self._pending_pdus.append(pdu) + else: + self._catchup_last_skipped = pdu.internal_metadata.stream_ordering + self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: @@ -218,6 +239,13 @@ async def _transaction_transmission_loop(self) -> None: # hence why we throw the result away. await get_retry_limiter(self._destination, self._clock, self._store) + if self._catching_up: + # we potentially need to catch-up first + await self._catch_up_transmission_loop() + if self._catching_up: + # not caught up yet + return + pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus @@ -351,8 +379,9 @@ async def _transaction_transmission_loop(self) -> None: if e.retry_interval > 60 * 60 * 1000: # we won't retry for another hour! # (this suggests a significant outage) - # We drop pending PDUs and EDUs because otherwise they will + # We drop pending EDUs because otherwise they will # rack up indefinitely. + # (Dropping PDUs is already performed by `_start_catching_up`.) # Note that: # - the EDUs that are being dropped here are those that we can # afford to drop (specifically, only typing notifications, @@ -364,11 +393,12 @@ async def _transaction_transmission_loop(self) -> None: # dropping read receipts is a bit sad but should be solved # through another mechanism, because this is all volatile! - self._pending_pdus = [] self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} self._pending_rrs = {} + + self._start_catching_up() except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: @@ -378,6 +408,8 @@ async def _transaction_transmission_loop(self) -> None: e.code, e, ) + + self._start_catching_up() except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -387,16 +419,96 @@ async def _transaction_transmission_loop(self) -> None: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._start_catching_up() except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._start_catching_up() finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False + async def _catch_up_transmission_loop(self) -> None: + first_catch_up_check = self._last_successful_stream_ordering is None + + if first_catch_up_check: + # first catchup so get last_successful_stream_ordering from database + self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering( + self._destination + ) + + if self._last_successful_stream_ordering is None: + # if it's still None, then this means we don't have the information + # in our database ­ we haven't successfully sent a PDU to this server + # (at least since the introduction of the feature tracking + # last_successful_stream_ordering). + # Sadly, this means we can't do anything here as we don't know what + # needs catching up — so catching up is futile; let's stop. + self._catching_up = False + return + + # get at most 50 catchup room/PDUs + while True: + event_ids = await self._store.get_catch_up_room_event_ids( + self._destination, self._last_successful_stream_ordering, + ) + + if not event_ids: + # No more events to catch up on, but we can't ignore the chance + # of a race condition, so we check that no new events have been + # skipped due to us being in catch-up mode + + if self._catchup_last_skipped > self._last_successful_stream_ordering: + # another event has been skipped because we were in catch-up mode + continue + + # we are done catching up! + self._catching_up = False + break + + if first_catch_up_check: + # as this is our check for needing catch-up, we may have PDUs in + # the queue from before we *knew* we had to do catch-up, so + # clear those out now. + self._start_catching_up() + + # fetch the relevant events from the event store + # - redacted behaviour of REDACT is fine, since we only send metadata + # of redacted events to the destination. + # - don't need to worry about rejected events as we do not actively + # forward received events over federation. + catchup_pdus = await self._store.get_events_as_list(event_ids) + if not catchup_pdus: + raise AssertionError( + "No events retrieved when we asked for %r. " + "This should not happen." % event_ids + ) + + if logger.isEnabledFor(logging.INFO): + rooms = (p.room_id for p in catchup_pdus) + logger.info("Catching up rooms to %s: %r", self._destination, rooms) + + success = await self._transaction_manager.send_new_transaction( + self._destination, catchup_pdus, [] + ) + + if not success: + return + + sent_transactions_counter.inc() + final_pdu = catchup_pdus[-1] + self._last_successful_stream_ordering = cast( + int, final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_ordering + ) + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return @@ -457,3 +569,12 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int] ] return (edus, stream_id) + + def _start_catching_up(self) -> None: + """ + Marks this destination as being in catch-up mode. + + This throws away the PDU queue. + """ + self._catching_up = True + self._pending_pdus = [] diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index c0a958252e5e..091367006e17 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import Iterable, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -371,3 +371,44 @@ async def set_destination_last_successful_stream_ordering( values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) + + async def get_catch_up_room_event_ids( + self, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + """ + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + + Returns: + list of event_ids + """ + return await self.db_pool.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, destination: str, last_successful_stream_ordering: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) + WHERE destination = ? + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering), + ) + event_ids = [row[0] for row in txn] + return event_ids diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 6cdcc378f098..cc52c3dfac0a 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -1,5 +1,10 @@ +from typing import List, Tuple + from mock import Mock +from synapse.events import EventBase +from synapse.federation.sender import PerDestinationQueue, TransactionManager +from synapse.federation.units import Edu from synapse.rest import admin from synapse.rest.client.v1 import login, room @@ -156,3 +161,163 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): row_2["stream_ordering"], "Send succeeded but not marked as last_successful_stream_ordering", ) + + @override_config({"send_federation": True}) # critical to federate + def test_catch_up_from_blank_state(self): + """ + Runs an overall test of federation catch-up from scratch. + Further tests will focus on more narrow aspects and edge-cases, but I + hope to provide an overall view with this test. + """ + # bring the other server online + self.is_online = True + + # let's make some events for the other server to receive + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + + # also critical to federate + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + + self.helper.send_state( + room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token + ) + + # check: PDU received for topic event + self.assertEqual(len(self.pdus), 1) + self.assertEqual(self.pdus[0]["type"], "m.room.topic") + + # take the remote offline + self.is_online = False + + # send another event + self.helper.send(room_1, "hi user!", tok=u1_token) + + # check: things didn't go well since the remote is down + self.assertEqual(len(self.failed_pdus), 1) + self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!") + + # let's delete the federation transmission queue + # (this pretends we are starting up fresh.) + self.assertFalse( + self.hs.get_federation_sender() + ._per_destination_queues["host2"] + .transmission_loop_running + ) + del self.hs.get_federation_sender()._per_destination_queues["host2"] + + # let's also clear any backoffs + self.get_success( + self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0) + ) + + # bring the remote online and clear the received pdu list + self.is_online = True + self.pdus = [] + + # now we need to initiate a federation transaction somehow… + # to do that, let's send another event (because it's simple to do) + # (do it to another room otherwise the catch-up logic decides it doesn't + # need to catch up room_1 — something I overlooked when first writing + # this test) + self.helper.send(room_2, "wombats!", tok=u1_token) + + # we should now have received both PDUs + self.assertEqual(len(self.pdus), 2) + self.assertEqual(self.pdus[0]["content"]["body"], "hi user!") + self.assertEqual(self.pdus[1]["content"]["body"], "wombats!") + + def make_fake_destination_queue( + self, destination: str = "host2" + ) -> Tuple[PerDestinationQueue, List[EventBase]]: + """ + Makes a fake per-destination queue. + """ + transaction_manager = TransactionManager(self.hs) + per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination) + results_list = [] + + async def fake_send( + destination_tm: str, + pending_pdus: List[EventBase], + _pending_edus: List[Edu], + ) -> bool: + assert destination == destination_tm + results_list.extend(pending_pdus) + return True # success! + + transaction_manager.send_new_transaction = fake_send + + return per_dest_queue, results_list + + @override_config({"send_federation": True}) + def test_catch_up_loop(self): + """ + Tests the behaviour of _catch_up_transmission_loop. + """ + + # ARRANGE: + # - a local user (u1) + # - 3 rooms which u1 is joined to (and remote user @user:host2 is + # joined to) + # - some events (1 to 5) in those rooms + # we have 'already sent' events 1 and 2 to host2 + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + room_3 = self.helper.create_room_as("u1", tok=u1_token) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + ) + + # create some events + self.helper.send(room_1, "you hear me!!", tok=u1_token) + event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"] + self.helper.send(room_3, "Matrix!", tok=u1_token) + event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"] + event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"] + + # destination_rooms should already be populated, but let us pretend that we already + # sent (successfully) up to and including event id 2 + event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) + + # also fetch event 5 so we know its last_successful_stream_ordering later + event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5)) + + self.get_success( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( + "host2", event_2.internal_metadata.stream_ordering + ) + ) + + # ACT + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT, noticing in particular: + # - event 3 not sent out, because event 5 replaces it + # - order is least recent first, so event 5 comes after event 4 + # - catch-up is completed + self.assertEqual(len(sent_pdus), 2) + self.assertEqual(sent_pdus[0].event_id, event_id_4) + self.assertEqual(sent_pdus[1].event_id, event_id_5) + self.assertFalse(per_dest_queue._catching_up) + self.assertEqual( + per_dest_queue._last_successful_stream_ordering, + event_5.internal_metadata.stream_ordering, + ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 7bf15c4ba93f..8822741e907a 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -73,6 +73,7 @@ def make_homeserver(self, reactor, clock): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_last_successful_stream_ordering", "get_destination_retry_timings", "get_devices_by_remote", "maybe_store_room_on_invite", @@ -120,6 +121,10 @@ def prepare(self, reactor, clock, hs): (0, []) ) + self.datastore.get_destination_last_successful_stream_ordering.return_value = make_awaitable( + None + ) + def get_received_txn_response(*args): return defer.succeed(None)