Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add some metrics for inbound and outbound federation processing times #7755

Merged
merged 5 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7755.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some metrics for inbound and outbound federation latencies: `synapse_federation_server_pdu_process_time` and `synapse_event_processing_lag_by_event`.
37 changes: 21 additions & 16 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union

from canonicaljson import json
from prometheus_client import Counter
from prometheus_client import Counter, Histogram

from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
Expand Down Expand Up @@ -70,6 +70,10 @@
"synapse_federation_server_received_queries", "", ["type"]
)

pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time", "Time taken to process an event",
)


class FederationServer(FederationBase):
def __init__(self, hs):
Expand Down Expand Up @@ -271,21 +275,22 @@ async def process_pdus_for_room(room_id: str):

for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)

await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
Expand Down
10 changes: 9 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,15 @@ async def handle_event(event: EventBase) -> None:

logger.debug("Sending %s to %r", event, destinations)

self._send_pdu(event, destinations)
if destinations:
self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about how much latency this will add by hitting the database again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, though I think in this case its a very simple query and will be dwarfed by the complexity of the get_hosts_in_room query above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. 👍


synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
).observe(now - ts)

async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def start_scheduler():
for service in services:
self.scheduler.submit_event_for_as(service, event)

now = self.clock.time_msec()
ts = yield self.store.get_received_ts(event.event_id)
synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
).observe(now - ts)

@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
Expand Down
6 changes: 6 additions & 0 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ def collect(self):
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])

event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
["name"],
)

# Build info of the running server.
build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
Expand Down