Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix rate limit metrics registering twice and misreporting
Browse files Browse the repository at this point in the history
Fix #13641
  • Loading branch information
MadLittleMods authored and David Robertson committed Aug 30, 2022
1 parent ea85a2b commit 8be5d7c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 32 deletions.
4 changes: 3 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,9 @@ def get_replication_streams(self) -> Dict[str, Stream]:
@cache_in_self
def get_federation_ratelimiter(self) -> FederationRateLimiter:
return FederationRateLimiter(
self.get_clock(), config=self.config.ratelimiting.rc_federation
self.get_clock(),
config=self.config.ratelimiting.rc_federation,
metrics_name="federation_servlets",
)

@cache_in_self
Expand Down
138 changes: 107 additions & 31 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import collections
import contextlib
import logging
import threading
import typing
from typing import Any, DefaultDict, Iterator, List, Set
from typing import Any, DefaultDict, Dict, Iterator, List, Mapping, Optional, Set, Tuple

from prometheus_client.core import Counter

Expand All @@ -40,12 +41,20 @@


# Track how much the ratelimiter is affecting requests
rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")
rate_limit_sleep_counter = Counter(
"synapse_rate_limit_sleep",
"Number of requests slept by the rate limiter",
["rate_limiter_name"],
)
rate_limit_reject_counter = Counter(
"synapse_rate_limit_reject",
"Number of requests rejected by the rate limiter",
["rate_limiter_name"],
)
queue_wait_timer = Histogram(
"synapse_rate_limit_queue_wait_time_seconds",
"sec",
[],
"Amount of time spent waiting for the rate limiter to let our request through.",
["rate_limiter_name"],
buckets=(
0.005,
0.01,
Expand All @@ -65,35 +74,89 @@
)


_rate_limiter_instances: Set["FederationRateLimiter"] = set()
# Protects the _rate_limiter_instances set from concurrent access
_rate_limiter_instances_lock = threading.Lock()


def _get_counts_from_rate_limiter_instance(count_func) -> Mapping[Tuple[str, ...], int]:
"""Returns a count of something (slept/rejected hosts) by (metrics_name)"""
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
with _rate_limiter_instances_lock:
rate_limiter_instances = list(_rate_limiter_instances)

# Map from (metrics_name,) -> int, the number of something like slept hosts
# or rejected hosts. The key type is Tuple[str], but we leave the length
# unspecified for compatability with LaterGauge's annotations.
counts: Dict[Tuple[str, ...], int] = {}
for rate_limiter_instance in rate_limiter_instances:
# Only track metrics if they provided a `metrics_name` to
# differentiate this instance of the rate limiter.
if rate_limiter_instance.metrics_name:
key = (rate_limiter_instance.metrics_name,)
counts[key] = count_func(rate_limiter_instance)

return counts


# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
["rate_limiter_name"],
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
["rate_limiter_name"],
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)


class FederationRateLimiter:
def __init__(self, clock: Clock, config: FederationRatelimitSettings):
"""Used to rate limit request per-host."""

def __init__(
self,
clock: Clock,
config: FederationRatelimitSettings,
metrics_name: Optional[str],
):
"""
Args:
clock
config
metrics_name: The name of the rate limiter so we can differentiate it
from the rest in the metrics
"""
self.metrics_name = metrics_name

def new_limiter() -> "_PerHostRatelimiter":
return _PerHostRatelimiter(clock=clock, config=config)
return _PerHostRatelimiter(
clock=clock, config=config, metrics_name=metrics_name
)

self.ratelimiters: DefaultDict[
str, "_PerHostRatelimiter"
] = collections.defaultdict(new_limiter)

# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
[],
lambda: sum(
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
[],
lambda: sum(
ratelimiter.should_reject()
for ratelimiter in self.ratelimiters.values()
),
)
with _rate_limiter_instances_lock:
_rate_limiter_instances.add(self)

def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
"""Used to ratelimit an incoming request from a given host
Expand All @@ -114,13 +177,21 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]


class _PerHostRatelimiter:
def __init__(self, clock: Clock, config: FederationRatelimitSettings):
def __init__(
self,
clock: Clock,
config: FederationRatelimitSettings,
metrics_name: Optional[str],
):
"""
Args:
clock
config
metrics_name: The name of the rate limiter so we can differentiate it
from the rest in the metrics
"""
self.clock = clock
self.metrics_name = metrics_name

self.window_size = config.window_size
self.sleep_limit = config.sleep_limit
Expand Down Expand Up @@ -178,7 +249,10 @@ def should_sleep(self) -> bool:
return len(self.request_times) > self.sleep_limit

async def _on_enter_with_tracing(self, request_id: object) -> None:
with start_active_span("ratelimit wait"), queue_wait_timer.time():
maybe_metrics_cm = contextlib.nullcontext()
if self.metrics_name:
maybe_metrics_cm = queue_wait_timer.labels(self.metrics_name).time()
with start_active_span("ratelimit wait"), maybe_metrics_cm:
await self._on_enter(request_id)

def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
Expand All @@ -193,7 +267,8 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
# sleeping or in the ready queue).
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
if self.metrics_name:
rate_limit_reject_counter.labels(self.metrics_name).inc()
raise LimitExceededError(
retry_after_ms=int(self.window_size / self.sleep_limit)
)
Expand Down Expand Up @@ -228,7 +303,8 @@ def queue_request() -> "defer.Deferred[None]":
id(request_id),
self.sleep_sec,
)
rate_limit_sleep_counter.inc()
if self.metrics_name:
rate_limit_sleep_counter.labels(self.metrics_name).inc()
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)

self.sleeping_requests.add(request_id)
Expand Down

0 comments on commit 8be5d7c

Please sign in to comment.