Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix ratelimiting for federation /send requests. (#8342)
Browse files Browse the repository at this point in the history
c.f. #8295 for rationale
  • Loading branch information
erikjohnston committed Sep 18, 2020
1 parent ad055ea commit 14b5b48
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.d/8342.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ratelimitng of federation `/send` requests.
52 changes: 40 additions & 12 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@ def __init__(self, hs):
self.state = hs.get_state_handler()

self.device_handler = hs.get_device_handler()
self._federation_ratelimiter = hs.get_federation_ratelimiter()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
)

self.transaction_actions = TransactionActions(self.store)

self.registry = hs.get_federation_registry()
Expand Down Expand Up @@ -135,22 +141,44 @@ async def on_incoming_transaction(
request_time = self._clock.time_msec()

transaction = Transaction(**transaction_data)
transaction_id = transaction.transaction_id # type: ignore

if not transaction.transaction_id: # type: ignore
if not transaction_id:
raise Exception("Transaction missing transaction_id")

logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore
logger.debug("[%s] Got transaction", transaction_id)

# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (
await self._transaction_linearizer.queue(
(origin, transaction.transaction_id) # type: ignore
)
):
result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaction_resp_cache.wrap(
(origin, transaction_id),
self._on_incoming_transaction_inner,
origin,
transaction,
request_time,
)

async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that transactions from a remote are
# processed in order.
with await self._transaction_linearizer.queue(origin):
# We rate limit here *after* we've queued up the incoming requests,
# so that we don't fill up the ratelimiter with blocked requests.
#
# This is important as the ratelimiter allows N concurrent requests
# at a time, and only starts ratelimiting if there are more requests
# than that being processed at a time. If we queued up requests in
# the linearizer/response cache *after* the ratelimiting then those
# queued up requests would count as part of the allowed limit of N
# concurrent requests.
with self._federation_ratelimiter.ratelimit(origin) as d:
await d

result = await self._handle_incoming_transaction(
origin, transaction, request_time
)

return result

Expand Down
13 changes: 8 additions & 5 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
Expand All @@ -72,9 +71,7 @@ def __init__(self, hs, servlet_groups=None):
super(TransportLayerServer, self).__init__(hs, canonical_json=False)

self.authenticator = Authenticator(hs)
self.ratelimiter = FederationRateLimiter(
self.clock, config=hs.config.rc_federation
)
self.ratelimiter = hs.get_federation_ratelimiter()

self.register_servlets()

Expand Down Expand Up @@ -272,6 +269,8 @@ class BaseFederationServlet:

PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version

RATELIMIT = True # Whether to rate limit requests or not

def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
Expand Down Expand Up @@ -335,7 +334,7 @@ async def new_func(request, *args, **kwargs):
)

with scope:
if origin:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
if request._disconnected:
Expand Down Expand Up @@ -372,6 +371,10 @@ def register(self, server):
class FederationSendServlet(BaseFederationServlet):
PATH = "/send/(?P<transaction_id>[^/]*)/?"

# We ratelimit manually in the handler as we queue up the requests and we
# don't want to fill up the ratelimiter with blocked requests.
RATELIMIT = False

def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(
handler, server_name=server_name, **kwargs
Expand Down
5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
from synapse.types import DomainSpecificString
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -642,6 +643,10 @@ def get_replication_data_handler(self) -> ReplicationDataHandler:
def get_replication_streams(self) -> Dict[str, Stream]:
return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}

@cache_in_self
def get_federation_ratelimiter(self) -> FederationRateLimiter:
return FederationRateLimiter(self.clock, config=self.config.rc_federation)

async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

Expand Down

0 comments on commit 14b5b48

Please sign in to comment.