Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add type hints for the federation sender. (#9681)
Browse files Browse the repository at this point in the history
Includes an abstract base class which both the FederationSender
and the FederationRemoteSendQueue must implement.
  • Loading branch information
clokep authored Mar 29, 2021
1 parent 4bbd535 commit da75d2e
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 59 deletions.
1 change: 1 addition & 0 deletions changelog.d/9681.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add additional type hints to the Homeserver object.
7 changes: 0 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,13 +787,6 @@ def __init__(self, hs: GenericWorkerServer):

self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")

def on_start(self):
# There may be some events that are persisted but haven't been sent,
# so send them now.
self.federation_sender.notify_new_events(
self.store.get_room_max_stream_ordering()
)

def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)

Expand Down
88 changes: 57 additions & 31 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,39 @@

import logging
from collections import namedtuple
from typing import Dict, List, Tuple, Type
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
List,
Optional,
Sized,
Tuple,
Type,
)

from sortedcontainers import SortedDict

from twisted.internet import defer

from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure

from .units import Edu

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class FederationRemoteSendQueue:
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
Expand All @@ -58,7 +72,7 @@ def __init__(self, hs):
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
self._sender_positions = {}
self._sender_positions = {} # type: Dict[str, int]

# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
Expand All @@ -71,7 +85,7 @@ def __init__(self, hs):
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
) # type: SortedDict[int, Tuple[str, List[str]]]
) # type: SortedDict[int, Tuple[str, Iterable[str]]]

# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
Expand All @@ -94,7 +108,7 @@ def __init__(self, hs):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
Expand All @@ -115,13 +129,13 @@ def register(name, queue):

self.clock.looping_call(self._clear_queue, 30 * 1000)

def _next_pos(self):
def _next_pos(self) -> int:
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos

def _clear_queue(self):
def _clear_queue(self) -> None:
"""Clear the queues for anything older than N minutes"""

FIVE_MINUTES_AGO = 5 * 60 * 1000
Expand All @@ -138,7 +152,7 @@ def _clear_queue(self):

self._clear_queue_before_pos(position_to_delete)

def _clear_queue_before_pos(self, position_to_delete):
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
Expand Down Expand Up @@ -188,13 +202,18 @@ def _clear_queue_before_pos(self, position_to_delete):
for key in keys[:i]:
del self.edus[key]

def notify_new_events(self, max_token):
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
# This should never get called.
raise NotImplementedError()

def build_and_send_edu(self, destination, edu_type, content, key=None):
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
Expand All @@ -218,38 +237,39 @@ def build_and_send_edu(self, destination, edu_type, content, key=None):

self.notifier.on_new_replication_data()

def send_read_receipt(self, receipt):
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender
Args:
receipt (synapse.types.ReadReceipt):
receipt:
"""
# nothing to do here: the replication listener will handle it.
return defer.succeed(None)

def send_presence(self, states):
def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
states (list(UserPresenceState))
states
"""
pos = self._next_pos()

# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
local_states = [s for s in states if self.is_mine_id(s.user_id)]

self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]

self.notifier.on_new_replication_data()

def send_presence_to_destinations(self, states, destinations):
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Args:
states (list[UserPresenceState])
destinations (list[str])
states
destinations
"""
for state in states:
pos = self._next_pos()
Expand All @@ -258,15 +278,18 @@ def send_presence_to_destinations(self, states, destinations):

self.notifier.on_new_replication_data()

def send_device_messages(self, destination):
def send_device_messages(self, destination: str) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.

def get_current_token(self):
def wake_destination(self, server: str) -> None:
pass

def get_current_token(self) -> int:
return self.pos - 1

def federation_ack(self, instance_name, token):
def federation_ack(self, instance_name: str, token: int) -> None:
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
Expand Down Expand Up @@ -504,13 +527,16 @@ def add_to_buffer(self, buff):
)


def process_rows_for_federation(transaction_queue, rows):
def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
transaction_queue
rows
"""

# The federation stream contains a bunch of different types of
Expand Down
Loading

0 comments on commit da75d2e

Please sign in to comment.