Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stagger send presence to remotes #10398

Merged
merged 6 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10398.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stagger sending of presence update to remote servers, reducing CPU spikes caused by starting many connections to remote servers at once.
96 changes: 94 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import abc
import logging
from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple

import attr
from prometheus_client import Counter
from typing_extensions import Literal

from twisted.internet import defer

Expand All @@ -33,8 +36,12 @@
event_processing_loop_room_count,
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util import Clock
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,6 +144,84 @@ async def get_replication_rows(
raise NotImplementedError()


@attr.s
class _PresenceQueue:
"""A queue of destinations that need to be woken up due to new presence
updates.

Staggers waking up of per destination queues to ensure that we don't attempt
to start TLS connections with many hosts all at once, leading to pinned CPU.
"""

# The maximum duration in seconds between queuing up a destination and it
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0

# The maximum duration in seconds between waking up consecutive destination
# queues.
_MAX_DELAY = 0.1

sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
processing: bool = attr.ib(default=False)

def add_to_queue(self, destination: str) -> None:
"""Add a destination to the queue to be woken up."""

self.queue[destination] = None

if not self.processing:
self._handle()

@wrap_as_background_process("_PresenceQueue.handle")
async def _handle(self) -> None:
"""Background process to drain the queue."""

if not self.queue:
return

assert not self.processing
self.processing = True

try:
# We start with a delay that should drain queue quickly enough that
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# we process all destinations in the queue in _MAX_TIME_IN_QUEUE
# seconds.
#
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

while self.queue:
destination, _ = self.queue.popitem(last=False)

queue = self.sender._get_per_destination_queue(destination)

if not queue._new_data_to_send:
# The per destination queue has already been woken up.
continue

queue.attempt_new_transaction()

await self.clock.sleep(current_sleep_seconds)

if not self.queue:
break

# More destinations may have been added to the queue, so we may
# need to reduce the delay to ensure everything gets processed
# with _MAX_TIME_IN_QUEUE seconds.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
current_sleep_seconds = min(
current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

finally:
self.processing = False


class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
Expand Down Expand Up @@ -210,6 +295,8 @@ def __init__(self, hs: "HomeServer"):

self._external_cache = hs.get_external_cache()

self._presence_queue = _PresenceQueue(self, self.clock)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination

Expand Down Expand Up @@ -519,7 +606,12 @@ def send_presence_to_destinations(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states)

self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
)

self._presence_queue.add_to_queue(destination)

def build_and_send_edu(
self,
Expand Down
16 changes: 13 additions & 3 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,24 @@ def send_pdu(self, pdu: EventBase) -> None:

self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
"""Add presence updates to the queue. Start the transmission loop if necessary.
def send_presence(
self, states: Iterable[UserPresenceState], start_loop: bool = True
) -> None:
"""Add presence updates to the queue.

Args:
states: Presence updates to send
start_loop: Whether to start the transmission loop if not already
running.

Args:
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()
self._new_data_to_send = True

if start_loop:
self.attempt_new_transaction()

def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
Expand Down
8 changes: 8 additions & 0 deletions tests/events/test_presence_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def test_send_local_online_presence_to_with_module(self):
presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id)
self.assertEqual(len(presence_updates), 3)

# We stagger sending of presence, so we need to wait a bit for them to
# get sent out.
self.reactor.advance(60)

# Test that sending to a remote user works
remote_user_id = "@far_away_person:island"

Expand All @@ -301,6 +305,10 @@ def test_send_local_online_presence_to_with_module(self):
self.module_api.send_local_online_presence_to([remote_user_id])
)

# We stagger sending of presence, so we need to wait a bit for them to
# get sent out.
self.reactor.advance(60)

# Check that the expected presence updates were sent
# We explicitly compare using sets as we expect that calling
# module_api.send_local_online_presence_to will create a presence
Expand Down