Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add type annotations to synapse.metrics (#10847)
Browse files Browse the repository at this point in the history
  • Loading branch information
squahtx authored Nov 17, 2021
1 parent d993c3b commit 84fac0f
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 85 deletions.
1 change: 1 addition & 0 deletions changelog.d/10847.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type annotations to `synapse.metrics`.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ disallow_untyped_defs = True
[mypy-synapse.handlers.*]
disallow_untyped_defs = True

[mypy-synapse.metrics.*]
disallow_untyped_defs = True

[mypy-synapse.push.*]
disallow_untyped_defs = True

Expand Down
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ async def start(hs: "HomeServer") -> None:
if hasattr(signal, "SIGHUP"):

@wrap_as_background_process("sighup")
def handle_sighup(*args: Any, **kwargs: Any) -> None:
async def handle_sighup(*args: Any, **kwargs: Any) -> None:
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")
Expand Down
4 changes: 3 additions & 1 deletion synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

from signedjson.sign import sign_json

from twisted.internet.defer import Deferred

from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, get_domain_from_id
Expand Down Expand Up @@ -166,7 +168,7 @@ async def on_renew_attestation(

return {}

def _start_renew_attestations(self) -> None:
def _start_renew_attestations(self) -> "Deferred[None]":
return run_as_background_process("renew_attestations", self._renew_attestations)

async def _renew_attestations(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _reset(self) -> None:
self.wheel_timer = WheelTimer(bucket_size=5000)

@wrap_as_background_process("typing._handle_timeouts")
def _handle_timeouts(self) -> None:
async def _handle_timeouts(self) -> None:
logger.debug("Checking for typing timeouts")

now = self.clock.time_msec()
Expand Down
101 changes: 70 additions & 31 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,25 @@
import platform
import threading
import time
from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
Expand All @@ -32,6 +47,7 @@
)

from twisted.internet import reactor
from twisted.internet.base import ReactorBase
from twisted.python.threadpool import ThreadPool

import synapse
Expand All @@ -54,7 +70,7 @@

class RegistryProxy:
@staticmethod
def collect():
def collect() -> Iterable[Metric]:
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
Expand All @@ -74,7 +90,7 @@ class LaterGauge:
]
)

def collect(self):
def collect(self) -> Iterable[Metric]:

g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)

Expand All @@ -93,10 +109,10 @@ def collect(self):

yield g

def __attrs_post_init__(self):
def __attrs_post_init__(self) -> None:
self._register()

def _register(self):
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand All @@ -105,7 +121,12 @@ def _register(self):
all_gauges[self.name] = self


class InFlightGauge:
# `MetricsEntry` only makes sense when it is a `Protocol`,
# but `Protocol` can't be used as a `TypeVar` bound.
MetricsEntry = TypeVar("MetricsEntry")


class InFlightGauge(Generic[MetricsEntry]):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Expand All @@ -115,34 +136,45 @@ class InFlightGauge:
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
name
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
"""

def __init__(self, name, desc, labels, sub_metrics):
def __init__(
self,
name: str,
desc: str,
labels: Sequence[str],
sub_metrics: Sequence[str],
):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics

# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
self._metrics_class: Type[MetricsEntry] = attr.make_class(
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)

# Counts number of in flight blocks for a given set of label values
self._registrations: Dict = {}
self._registrations: Dict[
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
] = {}

# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()

def register(self, key, callback):
def register(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
Expand All @@ -158,13 +190,17 @@ def register(self, key, callback):
with self._lock:
self._registrations.setdefault(key, set()).add(callback)

def unregister(self, key, callback):
def unregister(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've exited a block with labels `key`."""

with self._lock:
self._registrations.setdefault(key, set()).discard(callback)

def collect(self):
def collect(self) -> Iterable[Metric]:
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
Expand Down Expand Up @@ -200,7 +236,7 @@ def collect(self):
gauge.add_metric(key, getattr(metrics, name))
yield gauge

def _register_with_collector(self):
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand Down Expand Up @@ -230,7 +266,7 @@ def __init__(
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
registry: CollectorRegistry = REGISTRY,
):
"""
Args:
Expand All @@ -257,12 +293,12 @@ def __init__(

registry.register(self)

def collect(self):
def collect(self) -> Iterable[Metric]:
# Don't report metrics unless we've already collected some data
if self._metric is not None:
yield self._metric

def update_data(self, values: Iterable[float]):
def update_data(self, values: Iterable[float]) -> None:
"""Update the data to be reported by the metric
The existing data is cleared, and each measurement in the input is assigned
Expand Down Expand Up @@ -304,7 +340,7 @@ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFami


class CPUMetrics:
def __init__(self):
def __init__(self) -> None:
ticks_per_sec = 100
try:
# Try and get the system config
Expand All @@ -314,7 +350,7 @@ def __init__(self):

self.ticks_per_sec = ticks_per_sec

def collect(self):
def collect(self) -> Iterable[Metric]:
if not HAVE_PROC_SELF_STAT:
return

Expand Down Expand Up @@ -364,7 +400,7 @@ def collect(self):


class GCCounts:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
Expand All @@ -382,7 +418,7 @@ def collect(self):


class PyPyGCStats:
def collect(self):
def collect(self) -> Iterable[Metric]:

# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
Expand Down Expand Up @@ -565,7 +601,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:


class ReactorLastSeenMetric:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
Expand All @@ -584,9 +620,12 @@ def collect(self):
_last_gc = [0.0, 0.0, 0.0]


def runUntilCurrentTimer(reactor, func):
F = TypeVar("F", bound=Callable[..., Any])


def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
@functools.wraps(func)
def f(*args, **kwargs):
def f(*args: Any, **kwargs: Any) -> Any:
now = reactor.seconds()
num_pending = 0

Expand Down Expand Up @@ -649,7 +688,7 @@ def f(*args, **kwargs):

return ret

return f
return cast(F, f)


try:
Expand Down Expand Up @@ -677,5 +716,5 @@ def f(*args, **kwargs):
"start_http_server",
"LaterGauge",
"InFlightGauge",
"BucketCollector",
"GaugeBucketCollector",
]
Loading

0 comments on commit 84fac0f

Please sign in to comment.