From 66d63c022dfa9c5452c6df4bc73a23a2d4a90b6f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 20 Sep 2021 12:59:02 +0100 Subject: [PATCH 01/15] Loosen types of database callbacks to allow any return value --- synapse/storage/database.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0084d9f96ccc..4263c7d76169 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -179,7 +179,7 @@ def __getattr__(self, name): # The type of entry which goes on our after_callbacks and exception_callbacks lists. -_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]] +_CallbackListEntry = Tuple[Callable[..., object], Iterable[Any], Dict[str, Any]] R = TypeVar("R") @@ -226,7 +226,7 @@ def __init__( self.after_callbacks = after_callbacks self.exception_callbacks = exception_callbacks - def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): + def call_after(self, callback: Callable[..., object], *args: Any, **kwargs: Any): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. @@ -238,7 +238,7 @@ def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): self.after_callbacks.append((callback, args, kwargs)) def call_on_exception( - self, callback: Callable[..., None], *args: Any, **kwargs: Any + self, callback: Callable[..., object], *args: Any, **kwargs: Any ): # if self.exception_callbacks is None, that means that whatever constructed the # LoggingTransaction isn't expecting there to be any callbacks; assert that From 934fe41cd20b43662e5051611e5a8b111efdd460 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 20 Sep 2021 13:01:24 +0100 Subject: [PATCH 02/15] Fix return type of `GroupAttestionRenewer._start_renew_attestations()` --- synapse/groups/attestations.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 53f99031b1de..2e2a248fa45f 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -40,6 +40,8 @@ from signedjson.sign import sign_json +from twisted.internet.defer import Deferred + from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonDict, get_domain_from_id @@ -166,7 +168,7 @@ async def on_renew_attestation( return {} - def _start_renew_attestations(self) -> None: + def _start_renew_attestations(self) -> Deferred[None]: return run_as_background_process("renew_attestations", self._renew_attestations) async def _renew_attestations(self) -> None: From f058e4f60204ac2eedce842e07cd8ced75856d2b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 20 Sep 2021 13:04:27 +0100 Subject: [PATCH 03/15] Add type annotations to `synapse.metrics` --- changelog.d/10847.misc | 1 + mypy.ini | 3 + synapse/metrics/__init__.py | 93 +++++++++++------ synapse/metrics/_exposition.py | 26 ++--- synapse/metrics/background_process_metrics.py | 99 ++++++++++++++++--- synapse/metrics/jemalloc.py | 10 +- 6 files changed, 172 insertions(+), 60 deletions(-) create mode 100644 changelog.d/10847.misc diff --git a/changelog.d/10847.misc b/changelog.d/10847.misc new file mode 100644 index 000000000000..7933a38dca80 --- /dev/null +++ b/changelog.d/10847.misc @@ -0,0 +1 @@ +Add type annotations to `synapse.metrics`. diff --git a/mypy.ini b/mypy.ini index b21e1555ab7f..45cb1984e5c5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -91,6 +91,9 @@ files = tests/util/test_itertools.py, tests/util/test_stream_change_cache.py +[mypy-synapse.metrics.*] +disallow_untyped_defs = True + [mypy-synapse.rest.*] disallow_untyped_defs = True diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index f237b8a2369e..a4e0b1678102 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -20,10 +20,22 @@ import platform import threading import time -from typing import Callable, Dict, Iterable, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Optional, + Sequence, + Set, + Tuple, + TypeVar, + Union, + cast, +) import attr -from prometheus_client import Counter, Gauge, Histogram +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric from prometheus_client.core import ( REGISTRY, CounterMetricFamily, @@ -32,6 +44,7 @@ ) from twisted.internet import reactor +from twisted.internet.base import ReactorBase import synapse from synapse.metrics._exposition import ( @@ -53,7 +66,7 @@ class RegistryProxy: @staticmethod - def collect(): + def collect() -> Iterable[Metric]: for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric @@ -69,7 +82,7 @@ class LaterGauge: # or dict mapping from a label tuple to a value caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]]) - def collect(self): + def collect(self) -> Iterable[Metric]: g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) @@ -88,10 +101,10 @@ def collect(self): yield g - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: self._register() - def _register(self): + def _register(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -100,6 +113,10 @@ def _register(self): all_gauges[self.name] = self +# Placeholder for `InFlightGauge._metrics_class`, which is created dynamically +_MetricsEntry = Any + + class InFlightGauge: """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -110,14 +127,19 @@ class InFlightGauge: callbacks. Args: - name (str) - desc (str) - labels (list[str]) - sub_metrics (list[str]): A list of sub metrics that the callbacks - will update. + name + desc + labels + sub_metrics: A list of sub metrics that the callbacks will update. """ - def __init__(self, name, desc, labels, sub_metrics): + def __init__( + self, + name: str, + desc: str, + labels: Sequence[str], + sub_metrics: Sequence[str], + ): self.name = name self.desc = desc self.labels = labels @@ -130,14 +152,20 @@ def __init__(self, name, desc, labels, sub_metrics): ) # Counts number of in flight blocks for a given set of label values - self._registrations: Dict = {} + self._registrations: Dict[ + Tuple[str, ...], Set[Callable[[_MetricsEntry], None]] + ] = {} # Protects access to _registrations self._lock = threading.Lock() self._register_with_collector() - def register(self, key, callback): + def register( + self, + key: Tuple[str, ...], + callback: Callable[[_MetricsEntry], None], + ) -> None: """Registers that we've entered a new block with labels `key`. `callback` gets called each time the metrics are collected. The same @@ -153,13 +181,17 @@ def register(self, key, callback): with self._lock: self._registrations.setdefault(key, set()).add(callback) - def unregister(self, key, callback): + def unregister( + self, + key: Tuple[str, ...], + callback: Callable[[_MetricsEntry], None], + ) -> None: """Registers that we've exited a block with labels `key`.""" with self._lock: self._registrations.setdefault(key, set()).discard(callback) - def collect(self): + def collect(self) -> Iterable[Metric]: """Called by prometheus client when it reads metrics. Note: may be called by a separate thread. @@ -195,7 +227,7 @@ def collect(self): gauge.add_metric(key, getattr(metrics, name)) yield gauge - def _register_with_collector(self): + def _register_with_collector(self) -> None: if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) @@ -225,7 +257,7 @@ def __init__( name: str, documentation: str, buckets: Iterable[float], - registry=REGISTRY, + registry: CollectorRegistry = REGISTRY, ): """ Args: @@ -252,12 +284,12 @@ def __init__( registry.register(self) - def collect(self): + def collect(self) -> Iterable[Metric]: # Don't report metrics unless we've already collected some data if self._metric is not None: yield self._metric - def update_data(self, values: Iterable[float]): + def update_data(self, values: Iterable[float]) -> None: """Update the data to be reported by the metric The existing data is cleared, and each measurement in the input is assigned @@ -299,7 +331,7 @@ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFami class CPUMetrics: - def __init__(self): + def __init__(self) -> None: ticks_per_sec = 100 try: # Try and get the system config @@ -309,7 +341,7 @@ def __init__(self): self.ticks_per_sec = ticks_per_sec - def collect(self): + def collect(self) -> Iterable[Metric]: if not HAVE_PROC_SELF_STAT: return @@ -359,7 +391,7 @@ def collect(self): class GCCounts: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) @@ -377,7 +409,7 @@ def collect(self): class PyPyGCStats: - def collect(self): + def collect(self) -> Iterable[Metric]: # @stats is a pretty-printer object with __str__() returning a nice table, # plus some fields that contain data from that table. @@ -524,7 +556,7 @@ def collect(self): class ReactorLastSeenMetric: - def collect(self): + def collect(self) -> Iterable[Metric]: cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", "Seconds since the Twisted reactor was last seen", @@ -543,9 +575,12 @@ def collect(self): _last_gc = [0.0, 0.0, 0.0] -def runUntilCurrentTimer(reactor, func): +F = TypeVar("F", bound=Callable[..., Any]) + + +def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: @functools.wraps(func) - def f(*args, **kwargs): + def f(*args: Any, **kwargs: Any) -> Any: now = reactor.seconds() num_pending = 0 @@ -608,7 +643,7 @@ def f(*args, **kwargs): return ret - return f + return cast(F, f) try: @@ -636,5 +671,5 @@ def f(*args, **kwargs): "start_http_server", "LaterGauge", "InFlightGauge", - "BucketCollector", + "GaugeBucketCollector", ] diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index bb9bcb5592ed..5f83f99684e9 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -25,12 +25,14 @@ import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Dict, List +from typing import Any, Dict, List, Type from urllib.parse import parse_qs, urlparse -from prometheus_client import REGISTRY +from prometheus_client import REGISTRY, CollectorRegistry +from prometheus_client.core import Sample from twisted.web.resource import Resource +from twisted.web.server import Request from synapse.util import caches @@ -41,7 +43,7 @@ MINUS_INF = float("-inf") -def floatToGoString(d): +def floatToGoString(d: Any) -> str: d = float(d) if d == INF: return "+Inf" @@ -60,7 +62,7 @@ def floatToGoString(d): return s -def sample_line(line, name): +def sample_line(line: Sample, name: str) -> str: if line.labels: labelstr = "{{{0}}}".format( ",".join( @@ -82,7 +84,7 @@ def sample_line(line, name): return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp) -def generate_latest(registry, emit_help=False): +def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes: # Trigger the cache metrics to be rescraped, which updates the common # metrics but do not produce metrics themselves @@ -187,7 +189,7 @@ class MetricsHandler(BaseHTTPRequestHandler): registry = REGISTRY - def do_GET(self): + def do_GET(self) -> None: registry = self.registry params = parse_qs(urlparse(self.path).query) @@ -207,11 +209,11 @@ def do_GET(self): self.end_headers() self.wfile.write(output) - def log_message(self, format, *args): + def log_message(self, format: str, *args: Any) -> None: """Log nothing.""" @classmethod - def factory(cls, registry): + def factory(cls, registry: CollectorRegistry) -> Type: """Returns a dynamic MetricsHandler class tied to the passed registry. """ @@ -236,7 +238,9 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): daemon_threads = True -def start_http_server(port, addr="", registry=REGISTRY): +def start_http_server( + port: int, addr: str = "", registry: CollectorRegistry = REGISTRY +) -> None: """Starts an HTTP server for prometheus metrics as a daemon thread""" CustomMetricsHandler = MetricsHandler.factory(registry) httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler) @@ -252,10 +256,10 @@ class MetricsResource(Resource): isLeaf = True - def __init__(self, registry=REGISTRY): + def __init__(self, registry: CollectorRegistry = REGISTRY): self.registry = registry - def render_GET(self, request): + def render_GET(self, request: Request) -> bytes: request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) response = generate_latest(self.registry) request.setHeader(b"Content-Length", str(len(response))) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 3a14260752ed..c2ca89b25ee1 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -15,13 +15,33 @@ import logging import threading from functools import wraps -from typing import TYPE_CHECKING, Dict, Optional, Set, Union +from types import TracebackType +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + Iterable, + Optional, + Set, + Type, + TypeVar, + Union, + cast, + overload, +) +from prometheus_client import Metric from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + PreserveLoggingContext, +) from synapse.logging.opentracing import ( SynapseTags, noop_context_manager, @@ -116,7 +136,7 @@ class _Collector: before they are returned. """ - def collect(self): + def collect(self) -> Iterable[Metric]: global _background_processes_active_since_last_scrape # We swap out the _background_processes set with an empty one so that @@ -144,12 +164,12 @@ def collect(self): class _BackgroundProcess: - def __init__(self, desc, ctx): + def __init__(self, desc: str, ctx: LoggingContext): self.desc = desc self._context = ctx - self._reported_stats = None + self._reported_stats: Optional[ContextResourceUsage] = None - def update_metrics(self): + def update_metrics(self) -> None: """Updates the metrics with values from this process.""" new_stats = self._context.get_resource_usage() if self._reported_stats is None: @@ -169,7 +189,41 @@ def update_metrics(self): ) -def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): +R = TypeVar("R") + + +@overload +def run_as_background_process( # type: ignore[misc] + desc: str, + func: Callable[..., Awaitable[R]], + *args: Any, + bg_start_span: bool = True, + **kwargs: Any, +) -> defer.Deferred[Optional[R]]: + ... + + +@overload +def run_as_background_process( + desc: str, + func: Callable[..., R], + *args: Any, + bg_start_span: bool = True, + **kwargs: Any, +) -> defer.Deferred[Optional[R]]: + ... + + +def run_as_background_process( + desc: str, + func: Union[ + Callable[..., Awaitable[R]], + Callable[..., R], + ], + *args: Any, + bg_start_span: bool = True, + **kwargs: Any, +) -> defer.Deferred[Optional[R]]: """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -189,11 +243,12 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar args: positional args for func kwargs: keyword args for func - Returns: Deferred which returns the result of func, but note that it does not - follow the synapse logcontext rules. + Returns: Deferred which returns the result of func, or `None` if func raises. + Note that the returned Deferred does not follow the synapse logcontext + rules. """ - async def run(): + async def run() -> Optional[R]: with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -216,6 +271,7 @@ async def run(): "Background process '%s' threw an exception", desc, ) + return None finally: _background_process_in_flight_count.labels(desc).dec() @@ -225,19 +281,25 @@ async def run(): return defer.ensureDeferred(run()) -def wrap_as_background_process(desc): +F = TypeVar("F", bound=Callable[..., Any]) + + +def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. Equivalent of calling the function with `run_as_background_process` """ - def wrap_as_background_process_inner(func): + # NB: Return type is incorrect and should be F with a Deferred[Optional[R]] return + def wrap_as_background_process_inner(func: F) -> F: @wraps(func) - def wrap_as_background_process_inner_2(*args, **kwargs): + def wrap_as_background_process_inner_2( + *args: Any, **kwargs: Any + ) -> defer.Deferred[Optional[R]]: return run_as_background_process(desc, func, *args, **kwargs) - return wrap_as_background_process_inner_2 + return cast(F, wrap_as_background_process_inner_2) return wrap_as_background_process_inner @@ -265,7 +327,7 @@ def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None): super().__init__("%s-%s" % (name, instance_id)) self._proc = _BackgroundProcess(name, self) - def start(self, rusage: "Optional[resource._RUsage]"): + def start(self, rusage: "Optional[resource._RUsage]") -> None: """Log context has started running (again).""" super().start(rusage) @@ -276,7 +338,12 @@ def start(self, rusage: "Optional[resource._RUsage]"): with _bg_metrics_lock: _background_processes_active_since_last_scrape.add(self._proc) - def __exit__(self, type, value, traceback) -> None: + def __exit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: """Log context has finished.""" super().__exit__(type, value, traceback) diff --git a/synapse/metrics/jemalloc.py b/synapse/metrics/jemalloc.py index 29ab6c0229df..98ed9c0829e8 100644 --- a/synapse/metrics/jemalloc.py +++ b/synapse/metrics/jemalloc.py @@ -16,14 +16,16 @@ import logging import os import re -from typing import Optional +from typing import Iterable, Optional + +from prometheus_client import Metric from synapse.metrics import REGISTRY, GaugeMetricFamily logger = logging.getLogger(__name__) -def _setup_jemalloc_stats(): +def _setup_jemalloc_stats() -> None: """Checks to see if jemalloc is loaded, and hooks up a collector to record statistics exposed by jemalloc. """ @@ -135,7 +137,7 @@ def _jemalloc_refresh_stats() -> None: class JemallocCollector: """Metrics for internal jemalloc stats.""" - def collect(self): + def collect(self) -> Iterable[Metric]: _jemalloc_refresh_stats() g = GaugeMetricFamily( @@ -185,7 +187,7 @@ def collect(self): logger.debug("Added jemalloc stats") -def setup_jemalloc_stats(): +def setup_jemalloc_stats() -> None: """Try to setup jemalloc stats, if jemalloc is loaded.""" try: From 949439c652b3cae6071f4b24c8df862d51987006 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Mon, 20 Sep 2021 15:38:52 +0100 Subject: [PATCH 04/15] Update `run_as_background_process` docstring Co-authored-by: Patrick Cloke --- synapse/metrics/background_process_metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index c2ca89b25ee1..db0e7976edac 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -243,7 +243,8 @@ def run_as_background_process( args: positional args for func kwargs: keyword args for func - Returns: Deferred which returns the result of func, or `None` if func raises. + Returns: + Deferred which returns the result of func, or `None` if func raises. Note that the returned Deferred does not follow the synapse logcontext rules. """ From d55eb93b7005bd553a7878383e9cfbebbdadc103 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Mon, 20 Sep 2021 15:59:09 +0100 Subject: [PATCH 05/15] Quote return type of `GroupAttestionRenewer._start_renew_attestations()` Co-authored-by: Patrick Cloke --- synapse/groups/attestations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 2e2a248fa45f..a87896e5386c 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -168,7 +168,7 @@ async def on_renew_attestation( return {} - def _start_renew_attestations(self) -> Deferred[None]: + def _start_renew_attestations(self) -> "Deferred[None]": return run_as_background_process("renew_attestations", self._renew_attestations) async def _renew_attestations(self) -> None: From ab66469175e832b4f80e03f92a6100d62b72e5c8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 20 Sep 2021 15:57:52 +0100 Subject: [PATCH 06/15] Improve and add comments about `run_as_background_process` --- synapse/metrics/background_process_metrics.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index db0e7976edac..6cf858fc09f4 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -201,6 +201,8 @@ def run_as_background_process( # type: ignore[misc] **kwargs: Any, ) -> defer.Deferred[Optional[R]]: ... + # The `type: ignore[misc]` above suppresses + # "error: Overloaded function signatures 1 and 2 overlap with incompatible return types [misc]" @overload @@ -285,11 +287,16 @@ async def run() -> Optional[R]: F = TypeVar("F", bound=Callable[..., Any]) +# NB: Return type is incorrect and should be a callable returning an F with a +# Deferred[Optional[R]] return, which we can't express correctly until Python 3.10. def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. - Equivalent of calling the function with `run_as_background_process` + Equivalent to calling the function with `run_as_background_process`. + + Note that `run_as_background_process` changes the return type into + `Deferred[Optional[T]]`. """ # NB: Return type is incorrect and should be F with a Deferred[Optional[R]] return From d1ee5da42e0c58ede66c62976006d9165e1e8bbf Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 20 Sep 2021 16:03:42 +0100 Subject: [PATCH 07/15] Quote Deferred return types --- synapse/metrics/background_process_metrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 6cf858fc09f4..b63b15d79d84 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -199,7 +199,7 @@ def run_as_background_process( # type: ignore[misc] *args: Any, bg_start_span: bool = True, **kwargs: Any, -) -> defer.Deferred[Optional[R]]: +) -> "defer.Deferred[Optional[R]]": ... # The `type: ignore[misc]` above suppresses # "error: Overloaded function signatures 1 and 2 overlap with incompatible return types [misc]" @@ -212,7 +212,7 @@ def run_as_background_process( *args: Any, bg_start_span: bool = True, **kwargs: Any, -) -> defer.Deferred[Optional[R]]: +) -> "defer.Deferred[Optional[R]]": ... @@ -225,7 +225,7 @@ def run_as_background_process( *args: Any, bg_start_span: bool = True, **kwargs: Any, -) -> defer.Deferred[Optional[R]]: +) -> "defer.Deferred[Optional[R]]": """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -304,7 +304,7 @@ def wrap_as_background_process_inner(func: F) -> F: @wraps(func) def wrap_as_background_process_inner_2( *args: Any, **kwargs: Any - ) -> defer.Deferred[Optional[R]]: + ) -> "defer.Deferred[Optional[R]]": return run_as_background_process(desc, func, *args, **kwargs) return cast(F, wrap_as_background_process_inner_2) From 6924ce8d468fce9f59410ceeeabf1ac50810a18d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Sep 2021 16:07:21 +0100 Subject: [PATCH 08/15] Update parameter type for floatToGoString and clean up --- synapse/metrics/_exposition.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 5f83f99684e9..583a5b1b9997 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -39,15 +39,10 @@ CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" -INF = float("inf") -MINUS_INF = float("-inf") - - -def floatToGoString(d: Any) -> str: - d = float(d) - if d == INF: +def floatToGoString(d: float) -> str: + if d == math.inf: return "+Inf" - elif d == MINUS_INF: + elif d == -math.inf: return "-Inf" elif math.isnan(d): return "NaN" From 7ab115378a4d5ea3c804f5ca5329c5b5e49dd451 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Sep 2021 16:24:16 +0100 Subject: [PATCH 09/15] Improve wording of documentation surrounding `run_as_background_process` --- synapse/metrics/background_process_metrics.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b63b15d79d84..0de4881fb899 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -203,6 +203,9 @@ def run_as_background_process( # type: ignore[misc] ... # The `type: ignore[misc]` above suppresses # "error: Overloaded function signatures 1 and 2 overlap with incompatible return types [misc]" + # + # Overloads are used instead of a `Union` here because mypy fails to infer the type of `R` + # and complains about almost every call to this function when `func` is a `Union`. @overload @@ -287,19 +290,19 @@ async def run() -> Optional[R]: F = TypeVar("F", bound=Callable[..., Any]) -# NB: Return type is incorrect and should be a callable returning an F with a -# Deferred[Optional[R]] return, which we can't express correctly until Python 3.10. def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. Equivalent to calling the function with `run_as_background_process`. - Note that `run_as_background_process` changes the return type into + Note that the annotated return type of this function is incorrect. + The return type of the function, once wrapped, is actually a `Deferred[Optional[T]]`. """ # NB: Return type is incorrect and should be F with a Deferred[Optional[R]] return + # We can't expression the return types correctly until Python 3.10. def wrap_as_background_process_inner(func: F) -> F: @wraps(func) def wrap_as_background_process_inner_2( From 77120c24640d0a2a7a621366827a5e42ab75ad60 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Sep 2021 17:24:34 +0100 Subject: [PATCH 10/15] Improve typing of InFlightGauge --- synapse/metrics/__init__.py | 17 ++++++++++------- synapse/util/metrics.py | 12 +++++++++--- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a4e0b1678102..d40852b0a228 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -24,11 +24,13 @@ Any, Callable, Dict, + Generic, Iterable, Optional, Sequence, Set, Tuple, + Type, TypeVar, Union, cast, @@ -113,11 +115,12 @@ def _register(self) -> None: all_gauges[self.name] = self -# Placeholder for `InFlightGauge._metrics_class`, which is created dynamically -_MetricsEntry = Any +# `MetricsEntry` only makes sense when it is a `Protocol`, +# but `Protocol` can't be used as a `TypeVar` bound. +MetricsEntry = TypeVar("MetricsEntry") -class InFlightGauge: +class InFlightGauge(Generic[MetricsEntry]): """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -147,13 +150,13 @@ def __init__( # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. - self._metrics_class = attr.make_class( + self._metrics_class: Type[MetricsEntry] = attr.make_class( "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True ) # Counts number of in flight blocks for a given set of label values self._registrations: Dict[ - Tuple[str, ...], Set[Callable[[_MetricsEntry], None]] + Tuple[str, ...], Set[Callable[[MetricsEntry], None]] ] = {} # Protects access to _registrations @@ -164,7 +167,7 @@ def __init__( def register( self, key: Tuple[str, ...], - callback: Callable[[_MetricsEntry], None], + callback: Callable[[MetricsEntry], None], ) -> None: """Registers that we've entered a new block with labels `key`. @@ -184,7 +187,7 @@ def register( def unregister( self, key: Tuple[str, ...], - callback: Callable[[_MetricsEntry], None], + callback: Callable[[MetricsEntry], None], ) -> None: """Registers that we've exited a block with labels `key`.""" diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 1b82dca81b08..f637d6a455ea 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -14,7 +14,7 @@ import logging from functools import wraps -from typing import Any, Callable, Optional, TypeVar, cast +from typing import Any, Callable, Optional, Protocol, TypeVar, cast from prometheus_client import Counter @@ -53,8 +53,14 @@ "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"] ) + +class InFlightBlockMetrics(Protocol): + real_time_max: float + real_time_sum: float + + # Tracks the number of blocks currently active -in_flight = InFlightGauge( +in_flight = InFlightGauge[InFlightBlockMetrics]( "synapse_util_metrics_block_in_flight", "", labels=["block_name"], @@ -168,7 +174,7 @@ def get_resource_usage(self) -> ContextResourceUsage: """ return self._logging_context.get_resource_usage() - def _update_in_flight(self, metrics): + def _update_in_flight(self, metrics: InFlightBlockMetrics): """Gets called when processing in flight metrics""" duration = self.clock.time() - self.start From dba3f61e510cc3df421c82aa87187665b9a53282 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Sep 2021 17:41:39 +0100 Subject: [PATCH 11/15] Fix Protocol import --- synapse/util/metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index f637d6a455ea..23677cd468ea 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -14,9 +14,10 @@ import logging from functools import wraps -from typing import Any, Callable, Optional, Protocol, TypeVar, cast +from typing import Any, Callable, Optional, TypeVar, cast from prometheus_client import Counter +from typing_extensions import Protocol from synapse.logging.context import ( ContextResourceUsage, From b483e31193083e8566e6869cca7b5acdb3ec5ced Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Sep 2021 18:06:46 +0100 Subject: [PATCH 12/15] Fix regression in floatToGoString() when handling ints --- synapse/metrics/_exposition.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 583a5b1b9997..353d0a63b618 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -25,7 +25,7 @@ import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Any, Dict, List, Type +from typing import Any, Dict, List, Type, Union from urllib.parse import parse_qs, urlparse from prometheus_client import REGISTRY, CollectorRegistry @@ -39,7 +39,8 @@ CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" -def floatToGoString(d: float) -> str: +def floatToGoString(d: Union[int, float]) -> str: + d = float(d) if d == math.inf: return "+Inf" elif d == -math.inf: From 055dbde06803b94f1af22f89599a3ecfd5016bdd Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 13 Oct 2021 15:41:31 +0100 Subject: [PATCH 13/15] Fix typo --- synapse/metrics/background_process_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b57f84290820..27e5cdd37dde 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -302,7 +302,7 @@ def wrap_as_background_process(desc: str) -> Callable[[F], F]: """ # NB: Return type is incorrect and should be F with a Deferred[Optional[R]] return - # We can't expression the return types correctly until Python 3.10. + # We can't express the return types correctly until Python 3.10. def wrap_as_background_process_inner(func: F) -> F: @wraps(func) def wrap_as_background_process_inner_2( From 6c2f682106bf64c97f5f5f73122a0b365d08060e Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 15 Nov 2021 15:38:20 +0000 Subject: [PATCH 14/15] Restrict `run_as_background_process` to `Awaitable`-returning functions --- synapse/app/_base.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/metrics/background_process_metrics.py | 46 ++----------------- synapse/util/caches/expiringcache.py | 2 +- 4 files changed, 7 insertions(+), 45 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 573bb487b296..807ee3d46ef1 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -402,7 +402,7 @@ async def start(hs: "HomeServer") -> None: if hasattr(signal, "SIGHUP"): @wrap_as_background_process("sighup") - def handle_sighup(*args: Any, **kwargs: Any) -> None: + async def handle_sighup(*args: Any, **kwargs: Any) -> None: # Tell systemd our state, if we're using it. This will silently fail if # we're not using systemd. sdnotify(b"RELOADING=1") diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 22c6174821cf..1676ebd057c1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -90,7 +90,7 @@ def _reset(self) -> None: self.wheel_timer = WheelTimer(bucket_size=5000) @wrap_as_background_process("typing._handle_timeouts") - def _handle_timeouts(self) -> None: + async def _handle_timeouts(self) -> None: logger.debug("Checking for typing timeouts") now = self.clock.time_msec() diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 27e5cdd37dde..53c508af9170 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -29,7 +29,6 @@ TypeVar, Union, cast, - overload, ) from prometheus_client import Metric @@ -47,7 +46,6 @@ noop_context_manager, start_active_span, ) -from synapse.util.async_helpers import maybe_awaitable if TYPE_CHECKING: import resource @@ -192,39 +190,9 @@ def update_metrics(self) -> None: R = TypeVar("R") -@overload -def run_as_background_process( # type: ignore[misc] - desc: str, - func: Callable[..., Awaitable[R]], - *args: Any, - bg_start_span: bool = True, - **kwargs: Any, -) -> "defer.Deferred[Optional[R]]": - ... - # The `type: ignore[misc]` above suppresses - # "error: Overloaded function signatures 1 and 2 overlap with incompatible return types [misc]" - # - # Overloads are used instead of a `Union` here because mypy fails to infer the type of `R` - # and complains about almost every call to this function when `func` is a `Union`. - - -@overload -def run_as_background_process( - desc: str, - func: Callable[..., R], - *args: Any, - bg_start_span: bool = True, - **kwargs: Any, -) -> "defer.Deferred[Optional[R]]": - ... - - def run_as_background_process( desc: str, - func: Union[ - Callable[..., Awaitable[R]], - Callable[..., R], - ], + func: Callable[..., Awaitable[Optional[R]]], *args: Any, bg_start_span: bool = True, **kwargs: Any, @@ -271,7 +239,7 @@ async def run() -> Optional[R]: else: ctx = noop_context_manager() with ctx: - return await maybe_awaitable(func(*args, **kwargs)) + return await func(*args, **kwargs) except Exception: logger.exception( "Background process '%s' threw an exception", @@ -287,22 +255,16 @@ async def run() -> Optional[R]: return defer.ensureDeferred(run()) -F = TypeVar("F", bound=Callable[..., Any]) +F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]]) def wrap_as_background_process(desc: str) -> Callable[[F], F]: """Decorator that wraps a function that gets called as a background process. - Equivalent to calling the function with `run_as_background_process`. - - Note that the annotated return type of this function is incorrect. - The return type of the function, once wrapped, is actually a - `Deferred[Optional[T]]`. + Equivalent to calling the function with `run_as_background_process` """ - # NB: Return type is incorrect and should be F with a Deferred[Optional[R]] return - # We can't express the return types correctly until Python 3.10. def wrap_as_background_process_inner(func: F) -> F: @wraps(func) def wrap_as_background_process_inner_2( diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index c3f72aa06de6..7290fb0745f0 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -157,7 +157,7 @@ def setdefault(self, key: KT, value: VT) -> VT: self[key] = value return value - def _prune_cache(self) -> None: + async def _prune_cache(self) -> None: if not self._expiry_ms: # zero expiry time means don't expire. This should never get called # since we have this check in start too. From 53ff8f5cdeb42ff29b08a8a5e0f8cad3f3f1101a Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 15 Nov 2021 15:43:06 +0000 Subject: [PATCH 15/15] slight style tweak --- synapse/util/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 42f6b5074911..2717ff29c4b1 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -63,7 +63,7 @@ class InFlightBlockMetrics(Protocol): # Tracks the number of blocks currently active -in_flight = InFlightGauge[InFlightBlockMetrics]( +in_flight: InFlightGauge[InFlightBlockMetrics] = InFlightGauge( "synapse_util_metrics_block_in_flight", "", labels=["block_name"],