Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add some metrics to staging area (#10284)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jul 1, 2021
1 parent 04c8f30 commit 76addad
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/10284.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics for new inbound federation staging area.
39 changes: 39 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple

from prometheus_client import Gauge

from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion
Expand All @@ -32,6 +34,16 @@
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter

oldest_pdu_in_federation_staging = Gauge(
"synapse_federation_server_oldest_inbound_pdu_in_staging",
"The age in seconds since we received the oldest pdu in the federation staging area",
)

number_pdus_in_federation_queue = Gauge(
"synapse_federation_server_number_inbound_pdu_in_staging",
"The total number of events in the inbound federation staging",
)

logger = logging.getLogger(__name__)


Expand All @@ -54,6 +66,8 @@ def __init__(self, database: DatabasePool, db_conn, hs):
500000, "_event_auth_cache", size_callback=len
) # type: LruCache[str, List[Tuple[str, int]]]

self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)

async def get_auth_chain(
self, room_id: str, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]:
Expand Down Expand Up @@ -1193,6 +1207,31 @@ def _get_next_staged_event_for_room_txn(txn):

return origin, event

@wrap_as_background_process("_get_stats_for_federation_staging")
async def _get_stats_for_federation_staging(self):
"""Update the prometheus metrics for the inbound federation staging area."""

def _get_stats_for_federation_staging_txn(txn):
txn.execute(
"SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
)
(count,) = txn.fetchone()

txn.execute(
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
)

(age,) = txn.fetchone()

return count, age

count, age = await self.db_pool.runInteraction(
"_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
)

number_pdus_in_federation_queue.set(count)
oldest_pdu_in_federation_staging.set(age)


class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated
Expand Down

0 comments on commit 76addad

Please sign in to comment.