From 79644f7c4d38b92c62d0e88a5c68f2231574e86b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2020 14:40:47 +0000 Subject: [PATCH] Remove concept of a non-limited stream. (#7011) --- changelog.d/7011.misc | 1 + synapse/handlers/presence.py | 4 +- synapse/handlers/typing.py | 11 +++- synapse/replication/tcp/resource.py | 9 +-- synapse/replication/tcp/streams/_base.py | 66 ++++++++----------- synapse/storage/data_stores/main/devices.py | 10 ++- .../data_stores/main/end_to_end_keys.py | 14 ++-- synapse/storage/data_stores/main/presence.py | 23 ++++--- 8 files changed, 71 insertions(+), 67 deletions(-) create mode 100644 changelog.d/7011.misc diff --git a/changelog.d/7011.misc b/changelog.d/7011.misc new file mode 100644 index 000000000000..41c3b37574fa --- /dev/null +++ b/changelog.d/7011.misc @@ -0,0 +1 @@ +Remove concept of a non-limited stream. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 5526015ddb2b..6912165622ec 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -747,7 +747,7 @@ async def is_visible(self, observed_user, observer_user): return False - async def get_all_presence_updates(self, last_id, current_id): + async def get_all_presence_updates(self, last_id, current_id, limit): """ Gets a list of presence update rows from between the given stream ids. Each row has: @@ -762,7 +762,7 @@ async def get_all_presence_updates(self, last_id, current_id): """ # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. - rows = await self.store.get_all_presence_updates(last_id, current_id) + rows = await self.store.get_all_presence_updates(last_id, current_id, limit) return rows def notify_new_event(self): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 391bceb0c483..c7bc14c6234c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,6 +15,7 @@ import logging from collections import namedtuple +from typing import List from twisted.internet import defer @@ -257,7 +258,13 @@ def _push_update_local(self, member, typing): "typing_key", self._latest_room_serial, rooms=[member.room_id] ) - async def get_all_typing_updates(self, last_id, current_id): + async def get_all_typing_updates( + self, last_id: int, current_id: int, limit: int + ) -> List[dict]: + """Get up to `limit` typing updates between the given tokens, earliest + updates first. + """ + if last_id == current_id: return [] @@ -275,7 +282,7 @@ async def get_all_typing_updates(self, last_id, current_id): typing = self._room_typing[room_id] rows.append((serial, room_id, list(typing))) rows.sort() - return rows + return rows[:limit] def get_current_token(self): return self._latest_room_serial diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index ce9d1fae129d..6e2ebaf614d7 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -166,11 +166,6 @@ async def _run_notifier_loop(self): self.pending_updates = False with Measure(self.clock, "repl.stream.get_updates"): - # First we tell the streams that they should update their - # current tokens. - for stream in self.streams: - stream.advance_current_token() - all_streams = self.streams if self._replication_torture_level is not None: @@ -180,7 +175,7 @@ async def _run_notifier_loop(self): random.shuffle(all_streams) for stream in all_streams: - if stream.last_token == stream.upto_token: + if stream.last_token == stream.current_token(): continue if self._replication_torture_level: @@ -192,7 +187,7 @@ async def _run_notifier_loop(self): "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, - stream.upto_token, + stream.current_token(), ) try: updates, current_token = await stream.get_updates() diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 7a8b6e9df1d6..abf5c6c6a840 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -17,10 +17,12 @@ import itertools import logging from collections import namedtuple -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple import attr +from synapse.types import JsonDict + logger = logging.getLogger(__name__) @@ -119,13 +121,12 @@ class Stream(object): """Base class for the streams. Provides a `get_updates()` function that returns new updates since the last - time it was called up until the point `advance_current_token` was called. + time it was called. """ NAME = None # type: str # The name of the stream # The type of the row. Used by the default impl of parse_row. ROW_TYPE = None # type: Any - _LIMITED = True # Whether the update function takes a limit @classmethod def parse_row(cls, row): @@ -146,26 +147,15 @@ def __init__(self, hs): # The token from which we last asked for updates self.last_token = self.current_token() - # The token that we will get updates up to - self.upto_token = self.current_token() - - def advance_current_token(self): - """Updates `upto_token` to "now", which updates up until which point - get_updates[_since] will fetch rows till. - """ - self.upto_token = self.current_token() - def discard_updates_and_advance(self): """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. """ - self.upto_token = self.current_token() - self.last_token = self.upto_token + self.last_token = self.current_token() async def get_updates(self): """Gets all updates since the last time this function was called (or - since the stream was constructed if it hadn't been called before), - until the `upto_token` + since the stream was constructed if it hadn't been called before). Returns: Deferred[Tuple[List[Tuple[int, Any]], int]: @@ -178,44 +168,45 @@ async def get_updates(self): return updates, current_token - async def get_updates_since(self, from_token): + async def get_updates_since( + self, from_token: int + ) -> Tuple[List[Tuple[int, JsonDict]], int]: """Like get_updates except allows specifying from when we should stream updates Returns: - Deferred[Tuple[List[Tuple[int, Any]], int]: - Resolves to a pair ``(updates, current_token)``, where ``updates`` is a - list of ``(token, row)`` entries. ``row`` will be json-serialised and - sent over the replication steam. + Resolves to a pair `(updates, new_last_token)`, where `updates` is + a list of `(token, row)` entries and `new_last_token` is the new + position in stream. """ + if from_token in ("NOW", "now"): - return [], self.upto_token + return [], self.current_token() - current_token = self.upto_token + current_token = self.current_token() from_token = int(from_token) if from_token == current_token: return [], current_token - logger.info("get_updates_since: %s", self.__class__) - if self._LIMITED: - rows = await self.update_function( - from_token, current_token, limit=MAX_EVENTS_BEHIND + 1 - ) + rows = await self.update_function( + from_token, current_token, limit=MAX_EVENTS_BEHIND + 1 + ) - # never turn more than MAX_EVENTS_BEHIND + 1 into updates. - rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) - else: - rows = await self.update_function(from_token, current_token) + # never turn more than MAX_EVENTS_BEHIND + 1 into updates. + rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) updates = [(row[0], row[1:]) for row in rows] # check we didn't get more rows than the limit. # doing it like this allows the update_function to be a generator. - if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND: + if len(updates) >= MAX_EVENTS_BEHIND: raise Exception("stream %s has fallen behind" % (self.NAME)) + # The update function didn't hit the limit, so we must have got all + # the updates to `current_token`, and can return that as our new + # stream position. return updates, current_token def current_token(self): @@ -227,9 +218,8 @@ def current_token(self): """ raise NotImplementedError() - def update_function(self, from_token, current_token, limit=None): - """Get updates between from_token and to_token. If Stream._LIMITED is - True then limit is provided, otherwise it's not. + def update_function(self, from_token, current_token, limit): + """Get updates between from_token and to_token. Returns: Deferred(list(tuple)): the first entry in the tuple is the token for @@ -257,7 +247,6 @@ def __init__(self, hs): class PresenceStream(Stream): NAME = "presence" - _LIMITED = False ROW_TYPE = PresenceStreamRow def __init__(self, hs): @@ -272,7 +261,6 @@ def __init__(self, hs): class TypingStream(Stream): NAME = "typing" - _LIMITED = False ROW_TYPE = TypingStreamRow def __init__(self, hs): @@ -372,7 +360,6 @@ class DeviceListsStream(Stream): """ NAME = "device_lists" - _LIMITED = False ROW_TYPE = DeviceListsStreamRow def __init__(self, hs): @@ -462,7 +449,6 @@ class UserSignatureStream(Stream): """ NAME = "user_signature" - _LIMITED = False ROW_TYPE = UserSignatureStreamRow def __init__(self, hs): diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 023b70bb09a2..4926566c94d4 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -580,7 +580,7 @@ def get_users_whose_signatures_changed(self, user_id, from_key): return set() async def get_all_device_list_changes_for_remotes( - self, from_key: int, to_key: int + self, from_key: int, to_key: int, limit: int, ) -> List[Tuple[int, str]]: """Return a list of `(stream_id, entity)` which is the combined list of changes to devices and which destinations need to be poked. Entity is @@ -596,10 +596,16 @@ async def get_all_device_list_changes_for_remotes( SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes ) AS e WHERE ? < stream_id AND stream_id <= ? + LIMIT ? """ return await self.db.execute( - "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key + "get_all_device_list_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) @cached(max_entries=10000) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 001a53f9b4bf..bcf746b7ef9b 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -537,7 +537,7 @@ def get_e2e_cross_signing_keys_bulk( return result - def get_all_user_signature_changes_for_remotes(self, from_key, to_key): + def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit): """Return a list of changes from the user signature stream to notify remotes. Note that the user signature stream represents when a user signs their device with their user-signing key, which is not published to other @@ -552,13 +552,19 @@ def get_all_user_signature_changes_for_remotes(self, from_key, to_key): Deferred[list[(int,str)]] a list of `(stream_id, user_id)` """ sql = """ - SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id + SELECT stream_id, from_user_id AS user_id FROM user_signature_stream WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id + ORDER BY stream_id ASC + LIMIT ? """ return self.db.execute( - "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key + "get_all_user_signature_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index 604c8b7ddd9b..dab31e0c2ded 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -60,7 +60,7 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states): "status_msg": state.status_msg, "currently_active": state.currently_active, } - for state in presence_states + for stream_id, state in zip(stream_orderings, presence_states) ], ) @@ -73,19 +73,22 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id): + def get_all_presence_updates(self, last_id, current_id, limit): if last_id == current_id: return defer.succeed([]) def get_all_presence_updates_txn(txn): - sql = ( - "SELECT stream_id, user_id, state, last_active_ts," - " last_federation_update_ts, last_user_sync_ts, status_msg," - " currently_active" - " FROM presence_stream" - " WHERE ? < stream_id AND stream_id <= ?" - ) - txn.execute(sql, (last_id, current_id)) + sql = """ + SELECT stream_id, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, + status_msg, + currently_active + FROM presence_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) return txn.fetchall() return self.db.runInteraction(