Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type annotations to synapse.metrics #10847

Merged
merged 19 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10847.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type annotations to `synapse.metrics`.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ files =
[mypy-synapse.handlers.*]
disallow_untyped_defs = True

[mypy-synapse.metrics.*]
disallow_untyped_defs = True
squahtx marked this conversation as resolved.
Show resolved Hide resolved

[mypy-synapse.push.*]
disallow_untyped_defs = True

Expand Down
4 changes: 3 additions & 1 deletion synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

from signedjson.sign import sign_json

from twisted.internet.defer import Deferred

from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, get_domain_from_id
Expand Down Expand Up @@ -166,7 +168,7 @@ async def on_renew_attestation(

return {}

def _start_renew_attestations(self) -> None:
def _start_renew_attestations(self) -> "Deferred[None]":
return run_as_background_process("renew_attestations", self._renew_attestations)

async def _renew_attestations(self) -> None:
Expand Down
100 changes: 69 additions & 31 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@
import platform
import threading
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
Expand All @@ -32,6 +46,7 @@
)

from twisted.internet import reactor
from twisted.internet.base import ReactorBase

import synapse
from synapse.metrics._exposition import (
Expand All @@ -53,7 +68,7 @@

class RegistryProxy:
@staticmethod
def collect():
def collect() -> Iterable[Metric]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think this needs to be a Generator, but maybe not?

This is true for a variety of spots we use yield.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Or maybe I can significantly simplify type hints in other spots!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for using the most basic type that callers want, to minimize assumptions, rather than documenting the concrete return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Generators are Iterators (and all Iterators are Iterable), and in this case this seems cleaner than saying a Generator[Metric, None, None].

for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
Expand All @@ -69,7 +84,7 @@ class LaterGauge:
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])

def collect(self):
def collect(self) -> Iterable[Metric]:

g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)

Expand All @@ -88,10 +103,10 @@ def collect(self):

yield g

def __attrs_post_init__(self):
def __attrs_post_init__(self) -> None:
self._register()

def _register(self):
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand All @@ -100,7 +115,12 @@ def _register(self):
all_gauges[self.name] = self


class InFlightGauge:
# `MetricsEntry` only makes sense when it is a `Protocol`,
# but `Protocol` can't be used as a `TypeVar` bound.
MetricsEntry = TypeVar("MetricsEntry")


class InFlightGauge(Generic[MetricsEntry]):
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.

Expand All @@ -110,34 +130,45 @@ class InFlightGauge:
callbacks.

Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
name
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
"""

def __init__(self, name, desc, labels, sub_metrics):
def __init__(
self,
name: str,
desc: str,
labels: Sequence[str],
sub_metrics: Sequence[str],
):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics

# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
self._metrics_class: Type[MetricsEntry] = attr.make_class(
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)

# Counts number of in flight blocks for a given set of label values
self._registrations: Dict = {}
self._registrations: Dict[
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
] = {}

# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()

def register(self, key, callback):
def register(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've entered a new block with labels `key`.

`callback` gets called each time the metrics are collected. The same
Expand All @@ -153,13 +184,17 @@ def register(self, key, callback):
with self._lock:
self._registrations.setdefault(key, set()).add(callback)

def unregister(self, key, callback):
def unregister(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've exited a block with labels `key`."""

with self._lock:
self._registrations.setdefault(key, set()).discard(callback)

def collect(self):
def collect(self) -> Iterable[Metric]:
"""Called by prometheus client when it reads metrics.

Note: may be called by a separate thread.
Expand Down Expand Up @@ -195,7 +230,7 @@ def collect(self):
gauge.add_metric(key, getattr(metrics, name))
yield gauge

def _register_with_collector(self):
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
Expand Down Expand Up @@ -225,7 +260,7 @@ def __init__(
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
registry: CollectorRegistry = REGISTRY,
):
"""
Args:
Expand All @@ -252,12 +287,12 @@ def __init__(

registry.register(self)

def collect(self):
def collect(self) -> Iterable[Metric]:
# Don't report metrics unless we've already collected some data
if self._metric is not None:
yield self._metric

def update_data(self, values: Iterable[float]):
def update_data(self, values: Iterable[float]) -> None:
"""Update the data to be reported by the metric

The existing data is cleared, and each measurement in the input is assigned
Expand Down Expand Up @@ -299,7 +334,7 @@ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFami


class CPUMetrics:
def __init__(self):
def __init__(self) -> None:
ticks_per_sec = 100
try:
# Try and get the system config
Expand All @@ -309,7 +344,7 @@ def __init__(self):

self.ticks_per_sec = ticks_per_sec

def collect(self):
def collect(self) -> Iterable[Metric]:
if not HAVE_PROC_SELF_STAT:
return
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -359,7 +394,7 @@ def collect(self):


class GCCounts:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
Expand All @@ -377,7 +412,7 @@ def collect(self):


class PyPyGCStats:
def collect(self):
def collect(self) -> Iterable[Metric]:

# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
Expand Down Expand Up @@ -524,7 +559,7 @@ def collect(self):


class ReactorLastSeenMetric:
def collect(self):
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
Expand All @@ -543,9 +578,12 @@ def collect(self):
_last_gc = [0.0, 0.0, 0.0]


def runUntilCurrentTimer(reactor, func):
F = TypeVar("F", bound=Callable[..., Any])


def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@functools.wraps(func)
def f(*args, **kwargs):
def f(*args: Any, **kwargs: Any) -> Any:
now = reactor.seconds()
num_pending = 0

Expand Down Expand Up @@ -608,7 +646,7 @@ def f(*args, **kwargs):

return ret

return f
return cast(F, f)


try:
Expand Down Expand Up @@ -636,5 +674,5 @@ def f(*args, **kwargs):
"start_http_server",
"LaterGauge",
"InFlightGauge",
"BucketCollector",
"GaugeBucketCollector",
]
34 changes: 17 additions & 17 deletions synapse/metrics/_exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,25 @@
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, List
from typing import Any, Dict, List, Type, Union
from urllib.parse import parse_qs, urlparse

from prometheus_client import REGISTRY
from prometheus_client import REGISTRY, CollectorRegistry
from prometheus_client.core import Sample

from twisted.web.resource import Resource
from twisted.web.server import Request

from synapse.util import caches

CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"


INF = float("inf")
MINUS_INF = float("-inf")


def floatToGoString(d):
def floatToGoString(d: Union[int, float]) -> str:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
d = float(d)
if d == INF:
if d == math.inf:
return "+Inf"
elif d == MINUS_INF:
elif d == -math.inf:
return "-Inf"
elif math.isnan(d):
return "NaN"
Expand All @@ -60,7 +58,7 @@ def floatToGoString(d):
return s


def sample_line(line, name):
def sample_line(line: Sample, name: str) -> str:
if line.labels:
labelstr = "{{{0}}}".format(
",".join(
Expand All @@ -82,7 +80,7 @@ def sample_line(line, name):
return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp)


def generate_latest(registry, emit_help=False):
def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes:

# Trigger the cache metrics to be rescraped, which updates the common
# metrics but do not produce metrics themselves
Expand Down Expand Up @@ -187,7 +185,7 @@ class MetricsHandler(BaseHTTPRequestHandler):

registry = REGISTRY

def do_GET(self):
def do_GET(self) -> None:
registry = self.registry
params = parse_qs(urlparse(self.path).query)

Expand All @@ -207,11 +205,11 @@ def do_GET(self):
self.end_headers()
self.wfile.write(output)

def log_message(self, format, *args):
def log_message(self, format: str, *args: Any) -> None:
"""Log nothing."""

@classmethod
def factory(cls, registry):
def factory(cls, registry: CollectorRegistry) -> Type:
"""Returns a dynamic MetricsHandler class tied
to the passed registry.
"""
Expand All @@ -236,7 +234,9 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
daemon_threads = True


def start_http_server(port, addr="", registry=REGISTRY):
def start_http_server(
port: int, addr: str = "", registry: CollectorRegistry = REGISTRY
) -> None:
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
CustomMetricsHandler = MetricsHandler.factory(registry)
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
Expand All @@ -252,10 +252,10 @@ class MetricsResource(Resource):

isLeaf = True

def __init__(self, registry=REGISTRY):
def __init__(self, registry: CollectorRegistry = REGISTRY):
self.registry = registry

def render_GET(self, request):
def render_GET(self, request: Request) -> bytes:
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
response = generate_latest(self.registry)
request.setHeader(b"Content-Length", str(len(response)))
Expand Down
Loading