From 2b328d7e0233bc331ce567e593b4c2f94a7d6e2e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 12 Mar 2021 15:08:03 +0000 Subject: [PATCH] Improve logging when processing incoming transactions (#9596) Put the room id in the logcontext, to make it easier to understand what's going on. --- changelog.d/9596.misc | 1 + synapse/federation/federation_server.py | 61 +++++++++++++----------- synapse/handlers/federation.py | 62 +++++++------------------ 3 files changed, 51 insertions(+), 73 deletions(-) create mode 100644 changelog.d/9596.misc diff --git a/changelog.d/9596.misc b/changelog.d/9596.misc new file mode 100644 index 000000000000..fc19a95f7532 --- /dev/null +++ b/changelog.d/9596.misc @@ -0,0 +1 @@ +Improve logging when processing incoming transactions. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 06c5e7a9e0f3..89fe6def4b9a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -335,34 +335,41 @@ async def _handle_pdus_in_txn( # impose a limit to avoid going too crazy with ram/cpu. async def process_pdus_for_room(room_id: str): - logger.debug("Processing PDUs for %s", room_id) - try: - await self.check_server_matches_acl(origin_host, room_id) - except AuthError as e: - logger.warning("Ignoring PDUs for room %s from banned server", room_id) - for pdu in pdus_by_room[room_id]: - event_id = pdu.event_id - pdu_results[event_id] = e.error_dict() - return + with nested_logging_context(room_id): + logger.debug("Processing PDUs for %s", room_id) - for pdu in pdus_by_room[room_id]: - event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) + try: + await self.check_server_matches_acl(origin_host, room_id) + except AuthError as e: + logger.warning( + "Ignoring PDUs for room %s from banned server", room_id + ) + for pdu in pdus_by_room[room_id]: + event_id = pdu.event_id + pdu_results[event_id] = e.error_dict() + return + + for pdu in pdus_by_room[room_id]: + pdu_results[pdu.event_id] = await process_pdu(pdu) + + async def process_pdu(pdu: EventBase) -> JsonDict: + event_id = pdu.event_id + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3fe02b719595..1d20c441f3af 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -201,7 +201,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: or pdu.internal_metadata.is_outlier() ) if already_seen: - logger.debug("[%s %s]: Already seen pdu", room_id, event_id) + logger.debug("Already seen pdu") return # do some initial sanity-checking of the event. In particular, make @@ -210,18 +210,14 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: try: self._sanity_check_event(pdu) except SynapseError as err: - logger.warning( - "[%s %s] Received event failed sanity checks", room_id, event_id - ) + logger.warning("Received event failed sanity checks") raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id) # If we are currently in the process of joining this room, then we # queue up events for later processing. if room_id in self.room_queues: logger.info( - "[%s %s] Queuing PDU from %s for now: join in progress", - room_id, - event_id, + "Queuing PDU from %s for now: join in progress", origin, ) self.room_queues[room_id].append((pdu, origin)) @@ -236,9 +232,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: is_in_room = await self.auth.check_host_in_room(room_id, self.server_name) if not is_in_room: logger.info( - "[%s %s] Ignoring PDU from %s as we're not in the room", - room_id, - event_id, + "Ignoring PDU from %s as we're not in the room", origin, ) return None @@ -250,7 +244,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) - logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth) + logger.debug("min_depth: %d", min_depth) prevs = set(pdu.prev_event_ids()) seen = await self.store.have_events_in_timeline(prevs) @@ -267,17 +261,13 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", - room_id, - event_id, + "Acquiring room lock to fetch %d missing prev_events: %s", len(missing_prevs), shortstr(missing_prevs), ) with (await self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "[%s %s] Acquired room lock to fetch %d missing prev_events", - room_id, - event_id, + "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), ) @@ -297,9 +287,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: if not prevs - seen: logger.info( - "[%s %s] Found all missing prev_events", - room_id, - event_id, + "Found all missing prev_events", ) if prevs - seen: @@ -329,9 +317,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: if sent_to_us_directly: logger.warning( - "[%s %s] Rejecting: failed to fetch %d prev events: %s", - room_id, - event_id, + "Rejecting: failed to fetch %d prev events: %s", len(prevs - seen), shortstr(prevs - seen), ) @@ -414,10 +400,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: state = [event_map[e] for e in state_map.values()] except Exception: logger.warning( - "[%s %s] Error attempting to resolve state at missing " - "prev_events", - room_id, - event_id, + "Error attempting to resolve state at missing " "prev_events", exc_info=True, ) raise FederationError( @@ -454,9 +437,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): latest |= seen logger.info( - "[%s %s]: Requesting missing events between %s and %s", - room_id, - event_id, + "Requesting missing events between %s and %s", shortstr(latest), event_id, ) @@ -523,15 +504,11 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): # We failed to get the missing events, but since we need to handle # the case of `get_missing_events` not returning the necessary # events anyway, it is safe to simply log the error and continue. - logger.warning( - "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e - ) + logger.warning("Failed to get prev_events: %s", e) return logger.info( - "[%s %s]: Got %d prev_events: %s", - room_id, - event_id, + "Got %d prev_events: %s", len(missing_events), shortstr(missing_events), ) @@ -542,9 +519,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): for ev in missing_events: logger.info( - "[%s %s] Handling received prev_event %s", - room_id, - event_id, + "Handling received prev_event %s", ev.event_id, ) with nested_logging_context(ev.event_id): @@ -553,9 +528,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): except FederationError as e: if e.code == 403: logger.warning( - "[%s %s] Received prev_event %s failed history check.", - room_id, - event_id, + "Received prev_event %s failed history check.", ev.event_id, ) else: @@ -707,10 +680,7 @@ async def _process_received_pdu( (ie, we are missing one or more prev_events), the resolved state at the event """ - room_id = event.room_id - event_id = event.event_id - - logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) + logger.debug("Processing event: %s", event) try: await self._handle_new_event(origin, event, state=state)