Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Split out federated PDU retrieval into a non-cached version (#11242)
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods authored Nov 9, 2021
1 parent 0ef69dd commit f1d5c2f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/11242.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Split out federated PDU retrieval function into a non-cached version.
80 changes: 58 additions & 22 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,58 @@ async def backfill(

return pdus

async def get_pdu_from_destination_raw(
self,
destination: str,
event_id: str,
room_version: RoomVersion,
outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
server. Does not have any caching or rate limiting!
Args:
destination: Which homeserver to query
event_id: event to fetch
room_version: version of the room
outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitrary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
Returns:
The requested PDU, or None if we were unable to find it.
Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
"""
transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)

logger.debug(
"retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
)

pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
]

if pdu_list and pdu_list[0]:
pdu = pdu_list[0]

# Check signatures are correct.
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
return signed_pdu

return None

async def get_pdu(
self,
destinations: Iterable[str],
Expand Down Expand Up @@ -321,30 +373,14 @@ async def get_pdu(
continue

try:
transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)

logger.debug(
"retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
signed_pdu = await self.get_pdu_from_destination_raw(
destination=destination,
event_id=event_id,
room_version=room_version,
outlier=outlier,
timeout=timeout,
)

pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
]

if pdu_list and pdu_list[0]:
pdu = pdu_list[0]

# Check signatures are correct.
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)

break

pdu_attempts[destination] = now

except SynapseError as e:
Expand Down

0 comments on commit f1d5c2f

Please sign in to comment.