Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3090 from matrix-org/erikj/processed_event_lag
Browse files Browse the repository at this point in the history
Add metrics for event processing lag
  • Loading branch information
erikjohnston authored Apr 12, 2018
2 parents 2611243 + 415aeef commit 0f13f30
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 5 deletions.
19 changes: 17 additions & 2 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,27 @@ def handle_room_events(events):
consumeErrors=True
))

events_processed_counter.inc_by(len(events))

yield self.store.update_federation_out_pos(
"events", next_token
)

if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)

synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)

events_processed_counter.inc_by(len(events))

synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)

finally:
self._is_processing = False

Expand Down
16 changes: 15 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,23 @@ def handle_room_events(events):
for evs in events_by_room.itervalues()
], consumeErrors=True))

yield self.store.set_appservice_last_pos(upper_bound)

now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)

synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)

events_processed_counter.inc_by(len(events))

yield self.store.set_appservice_last_pos(upper_bound)
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
finally:
self.is_processing = False

Expand Down
35 changes: 34 additions & 1 deletion synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector

Expand Down Expand Up @@ -65,6 +65,13 @@ def register_counter(self, *args, **kwargs):
"""
return self._register(CounterMetric, *args, **kwargs)

def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)

def register_callback(self, *args, **kwargs):
"""
Returns:
Expand Down Expand Up @@ -144,6 +151,32 @@ def render_all():
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")

synapse_metrics = get_metrics_for("synapse")

# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)

# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)

# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)

# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)


def runUntilCurrentTimer(func):

Expand Down
32 changes: 31 additions & 1 deletion synapse/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, *args, **kwargs):
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty list).
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}

# Scalar metrics are never empty
Expand Down Expand Up @@ -145,6 +145,36 @@ def render(self):
)


class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""

def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)

# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}

def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)

# TODO: should assert that the tag values are all strings

self.guages[values] = v

def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)


class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ def _persist_events(self, events_and_contexts, backfilled=False,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk:
if context.app_service:
origin_type = "local"
Expand Down
20 changes: 20 additions & 0 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@


class EventsWorkerStore(SQLBaseStore):
def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id (str)
Returns:
Deferred[int|None]: Timestamp in milliseconds, or None for events
that were persisted before received_ts was implemented.
"""
return self._simple_select_one_onecol(
table="events",
keyvalues={
"event_id": event_id,
},
retcol="received_ts",
desc="get_received_ts",
)

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def setUp(self):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
self.mock_store.get_received_ts.return_value = 0
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
hs.get_clock.return_value = MockClock()
Expand Down

0 comments on commit 0f13f30

Please sign in to comment.