Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add type annotations to synapse.metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Quah committed Sep 20, 2021
1 parent 934fe41 commit f058e4f
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 60 deletions.
1 change: 1 addition & 0 deletions changelog.d/10847.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type annotations to `synapse.metrics`.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ files =
tests/util/test_itertools.py,
tests/util/test_stream_change_cache.py

[mypy-synapse.metrics.*]
disallow_untyped_defs = True

[mypy-synapse.rest.*]
disallow_untyped_defs = True

Expand Down
93 changes: 64 additions & 29 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,22 @@
import platform
import threading
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
cast,
)

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
Expand All @@ -32,6 +44,7 @@
)

from twisted.internet import reactor
from twisted.internet.base import ReactorBase

import synapse
from synapse.metrics._exposition import (
Expand All @@ -53,7 +66,7 @@

class RegistryProxy:
@staticmethod
def collect():
def collect() -> Iterable[Metric]:
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
Expand All @@ -69,7 +82,7 @@ class LaterGauge:
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])

def collect(self):
def collect(self) -> Iterable[Metric]:

g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)

Expand All @@ -88,10 +101,10 @@ def collect(self):

yield g

def __attrs_post_init__(self):
def __attrs_post_init__(self) -> None:
self._register()

def _register(self):
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand All @@ -100,6 +113,10 @@ def _register(self):
all_gauges[self.name] = self


# Placeholder for `InFlightGauge._metrics_class`, which is created dynamically
_MetricsEntry = Any


class InFlightGauge:
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Expand All @@ -110,14 +127,19 @@ class InFlightGauge:
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
name
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
"""

def __init__(self, name, desc, labels, sub_metrics):
def __init__(
self,
name: str,
desc: str,
labels: Sequence[str],
sub_metrics: Sequence[str],
):
self.name = name
self.desc = desc
self.labels = labels
Expand All @@ -130,14 +152,20 @@ def __init__(self, name, desc, labels, sub_metrics):
)

# Counts number of in flight blocks for a given set of label values
self._registrations: Dict = {}
self._registrations: Dict[
Tuple[str, ...], Set[Callable[[_MetricsEntry], None]]
] = {}

# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()

def register(self, key, callback):
def register(
self,
key: Tuple[str, ...],
callback: Callable[[_MetricsEntry], None],
) -> None:
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
Expand All @@ -153,13 +181,17 @@ def register(self, key, callback):
with self._lock:
self._registrations.setdefault(key, set()).add(callback)

def unregister(self, key, callback):
def unregister(
self,
key: Tuple[str, ...],
callback: Callable[[_MetricsEntry], None],
) -> None:
"""Registers that we've exited a block with labels `key`."""

with self._lock:
self._registrations.setdefault(key, set()).discard(callback)

def collect(self):
def collect(self) -> Iterable[Metric]:
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
Expand Down Expand Up @@ -195,7 +227,7 @@ def collect(self):
gauge.add_metric(key, getattr(metrics, name))
yield gauge

def _register_with_collector(self):
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand Down Expand Up @@ -225,7 +257,7 @@ def __init__(
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
registry: CollectorRegistry = REGISTRY,
):
"""
Args:
Expand All @@ -252,12 +284,12 @@ def __init__(

registry.register(self)

def collect(self):
def collect(self) -> Iterable[Metric]:
# Don't report metrics unless we've already collected some data
if self._metric is not None:
yield self._metric

def update_data(self, values: Iterable[float]):
def update_data(self, values: Iterable[float]) -> None:
"""Update the data to be reported by the metric
The existing data is cleared, and each measurement in the input is assigned
Expand Down Expand Up @@ -299,7 +331,7 @@ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFami


class CPUMetrics:
def __init__(self):
def __init__(self) -> None:
ticks_per_sec = 100
try:
# Try and get the system config
Expand All @@ -309,7 +341,7 @@ def __init__(self):

self.ticks_per_sec = ticks_per_sec

def collect(self):
def collect(self) -> Iterable[Metric]:
if not HAVE_PROC_SELF_STAT:
return

Expand Down Expand Up @@ -359,7 +391,7 @@ def collect(self):


class GCCounts:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
Expand All @@ -377,7 +409,7 @@ def collect(self):


class PyPyGCStats:
def collect(self):
def collect(self) -> Iterable[Metric]:

# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
Expand Down Expand Up @@ -524,7 +556,7 @@ def collect(self):


class ReactorLastSeenMetric:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
Expand All @@ -543,9 +575,12 @@ def collect(self):
_last_gc = [0.0, 0.0, 0.0]


def runUntilCurrentTimer(reactor, func):
F = TypeVar("F", bound=Callable[..., Any])


def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
@functools.wraps(func)
def f(*args, **kwargs):
def f(*args: Any, **kwargs: Any) -> Any:
now = reactor.seconds()
num_pending = 0

Expand Down Expand Up @@ -608,7 +643,7 @@ def f(*args, **kwargs):

return ret

return f
return cast(F, f)


try:
Expand Down Expand Up @@ -636,5 +671,5 @@ def f(*args, **kwargs):
"start_http_server",
"LaterGauge",
"InFlightGauge",
"BucketCollector",
"GaugeBucketCollector",
]
26 changes: 15 additions & 11 deletions synapse/metrics/_exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, List
from typing import Any, Dict, List, Type
from urllib.parse import parse_qs, urlparse

from prometheus_client import REGISTRY
from prometheus_client import REGISTRY, CollectorRegistry
from prometheus_client.core import Sample

from twisted.web.resource import Resource
from twisted.web.server import Request

from synapse.util import caches

Expand All @@ -41,7 +43,7 @@
MINUS_INF = float("-inf")


def floatToGoString(d):
def floatToGoString(d: Any) -> str:
d = float(d)
if d == INF:
return "+Inf"
Expand All @@ -60,7 +62,7 @@ def floatToGoString(d):
return s


def sample_line(line, name):
def sample_line(line: Sample, name: str) -> str:
if line.labels:
labelstr = "{{{0}}}".format(
",".join(
Expand All @@ -82,7 +84,7 @@ def sample_line(line, name):
return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp)


def generate_latest(registry, emit_help=False):
def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes:

# Trigger the cache metrics to be rescraped, which updates the common
# metrics but do not produce metrics themselves
Expand Down Expand Up @@ -187,7 +189,7 @@ class MetricsHandler(BaseHTTPRequestHandler):

registry = REGISTRY

def do_GET(self):
def do_GET(self) -> None:
registry = self.registry
params = parse_qs(urlparse(self.path).query)

Expand All @@ -207,11 +209,11 @@ def do_GET(self):
self.end_headers()
self.wfile.write(output)

def log_message(self, format, *args):
def log_message(self, format: str, *args: Any) -> None:
"""Log nothing."""

@classmethod
def factory(cls, registry):
def factory(cls, registry: CollectorRegistry) -> Type:
"""Returns a dynamic MetricsHandler class tied
to the passed registry.
"""
Expand All @@ -236,7 +238,9 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
daemon_threads = True


def start_http_server(port, addr="", registry=REGISTRY):
def start_http_server(
port: int, addr: str = "", registry: CollectorRegistry = REGISTRY
) -> None:
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
CustomMetricsHandler = MetricsHandler.factory(registry)
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
Expand All @@ -252,10 +256,10 @@ class MetricsResource(Resource):

isLeaf = True

def __init__(self, registry=REGISTRY):
def __init__(self, registry: CollectorRegistry = REGISTRY):
self.registry = registry

def render_GET(self, request):
def render_GET(self, request: Request) -> bytes:
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
response = generate_latest(self.registry)
request.setHeader(b"Content-Length", str(len(response)))
Expand Down
Loading

0 comments on commit f058e4f

Please sign in to comment.