From 27cf170558865d0afeea5d4c2524e8664c28323b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 10:53:29 +0100 Subject: [PATCH 1/7] Refactor recent events func to use pagination func This also removes a cache that is unlikely to ever get hit. --- synapse/storage/stream.py | 75 ++++++++++++++------------------------- 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 54be0254018f..da43bb1321e2 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -38,7 +38,6 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore -from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -363,60 +362,40 @@ def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None) defer.returnValue((events, token)) - @cached(num_args=4) + @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): - end_token = RoomStreamToken.parse_stream_token(end_token) - - if from_token is None: - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - else: - from_token = RoomStreamToken.parse_stream_token(from_token) - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - - def get_recent_events_for_room_txn(txn): - if from_token is None: - txn.execute(sql, (room_id, end_token.stream, False, limit,)) - else: - txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, False, limit - )) + """Get the most recent events in the room in topological ordering. - rows = self.cursor_to_dict(txn) + Args: + room_id (str) + limit (int) + end_token (str): The stream token representing now. + from_token(str|None): Token to not return events before, if given. - rows.reverse() # As we selected with reverse ordering + Returns: + Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of + dicts (which include event_ids, etc), and a tuple for + `(start_token, end_token)` representing the range of rows + returned. + The returned events are in ascending order. + """ + # Allow a zero limit here, and no-op. + if limit == 0: + defer.returnValue(([], (end_token, end_token))) - if rows: - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # since we are going backwards so we subtract one from the - # stream part. - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - 1 - start_token = str(RoomStreamToken(topo, toke)) + end_token = RoomStreamToken.parse_stream_token(end_token) + if from_token is not None: + from_token = RoomStreamToken.parse(from_token) - token = (start_token, str(end_token)) - else: - token = (str(end_token), str(end_token)) + rows, token = yield self.runInteraction( + "get_recent_event_ids_for_room", self._paginate_room_events_txn, + room_id, from_token=end_token, to_token=from_token, limit=limit, + ) - return rows, token + # We want to return the results in ascending order. + rows.reverse() - return self.runInteraction( - "get_recent_events_for_room", get_recent_events_for_room_txn - ) + defer.returnValue((rows, (token, str(end_token)))) def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering From 7dd13415dbf82edeaae0b94f835e20025964e1b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 10:58:16 +0100 Subject: [PATCH 2/7] Remove unused from_token param --- synapse/storage/stream.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index da43bb1321e2..ecd39074b8e8 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -346,9 +346,9 @@ def f(txn): defer.returnValue(ret) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + def get_recent_events_for_room(self, room_id, limit, end_token): rows, token = yield self.get_recent_event_ids_for_room( - room_id, limit, end_token, from_token + room_id, limit, end_token, ) logger.debug("stream before") @@ -363,14 +363,13 @@ def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None) defer.returnValue((events, token)) @defer.inlineCallbacks - def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): + def get_recent_event_ids_for_room(self, room_id, limit, end_token): """Get the most recent events in the room in topological ordering. Args: room_id (str) limit (int) end_token (str): The stream token representing now. - from_token(str|None): Token to not return events before, if given. Returns: Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of @@ -384,12 +383,10 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=No defer.returnValue(([], (end_token, end_token))) end_token = RoomStreamToken.parse_stream_token(end_token) - if from_token is not None: - from_token = RoomStreamToken.parse(from_token) rows, token = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, - room_id, from_token=end_token, to_token=from_token, limit=limit, + room_id, from_token=end_token, limit=limit, ) # We want to return the results in ascending order. From 05e0a2462c76be6987c7ec3d9517d500583bac65 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:18:23 +0100 Subject: [PATCH 3/7] Refactor pagination DB API to return concrete type This makes it easier to document what is being returned by the storage functions and what some functions expect as arguments. --- synapse/storage/stream.py | 76 ++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ecd39074b8e8..772d2c619877 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -47,6 +47,7 @@ import logging from six.moves import range +from collections import namedtuple logger = logging.getLogger(__name__) @@ -59,6 +60,12 @@ _TOPOLOGICAL_TOKEN = "topological" +# Used as return values for pagination APIs +_EventDictReturn = namedtuple("_EventDictReturn", ( + "event_id", "topological_ordering", "stream_ordering", +)) + + def lower_bound(token, engine, inclusive=False): inclusive = "=" if inclusive else "" if token.topological is None: @@ -256,9 +263,13 @@ def f(txn): " ORDER BY stream_ordering %s LIMIT ?" ) % (order,) txn.execute(sql, (room_id, from_id, to_id, limit)) + + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] else: sql = ( - "SELECT event_id, stream_ordering FROM events WHERE" + "SELECT event_id, topological_ordering, stream_ordering" + " FROM events" + " WHERE" " room_id = ?" " AND not outlier" " AND stream_ordering <= ?" @@ -266,14 +277,14 @@ def f(txn): ) % (order, order,) txn.execute(sql, (room_id, to_id, limit)) - rows = self.cursor_to_dict(txn) + rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) ret = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) @@ -283,7 +294,7 @@ def f(txn): ret.reverse() if rows: - key = "s%d" % min(r["stream_ordering"] for r in rows) + key = "s%d" % min(r.stream_ordering for r in rows) else: # Assume we didn't get anything because there was nothing to # get. @@ -330,14 +341,15 @@ def f(txn): " ORDER BY stream_ordering ASC" ) txn.execute(sql, (user_id, to_id,)) - rows = self.cursor_to_dict(txn) + + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) @@ -353,14 +365,14 @@ def get_recent_events_for_room(self, room_id, limit, end_token): logger.debug("stream before") events = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) logger.debug("stream after") self._set_before_and_after(events, rows) - defer.returnValue((events, token)) + defer.returnValue((events, (token, end_token))) @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token): @@ -372,15 +384,14 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of - dicts (which include event_ids, etc), and a tuple for - `(start_token, end_token)` representing the range of rows - returned. - The returned events are in ascending order. + Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of + _EventDictReturn and a token pointint to the start of the returned + events. + The events returned are in ascending order. """ # Allow a zero limit here, and no-op. if limit == 0: - defer.returnValue(([], (end_token, end_token))) + defer.returnValue(([], end_token)) end_token = RoomStreamToken.parse_stream_token(end_token) @@ -392,7 +403,7 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): # We want to return the results in ascending order. rows.reverse() - defer.returnValue((rows, (token, str(end_token)))) + defer.returnValue((rows, token)) def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering @@ -496,10 +507,19 @@ def _get_max_topological_txn(self, txn, room_id): @staticmethod def _set_before_and_after(events, rows, topo_order=True): + """Inserts ordering information to events' internal metadata from + the DB rows. + + Args: + events (list[FrozenEvent]) + rows (list[_EventDictReturn]) + topo_order (bool): Whether the events were ordered topologically + or by stream ordering + """ for event, row in zip(events, rows): - stream = row["stream_ordering"] - if topo_order: - topo = event.depth + stream = row.stream_ordering + if topo_order and row.topological_ordering: + topo = row.topological_ordering else: topo = None internal = event.internal_metadata @@ -586,12 +606,12 @@ def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_lim rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) - events_before = [r["event_id"] for r in rows] + events_before = [r.event_id for r in rows] rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) - events_after = [r["event_id"] for r in rows] + events_after = [r.event_id for r in rows] return { "before": { @@ -672,9 +692,9 @@ def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "toplogical_ordering" and "stream_ordering". + tuple[list[_EventDictReturn], str]: Returns the results as a list + of _EventDictReturn and a token that points to the end of the + result set. """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -725,11 +745,11 @@ def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, txn.execute(sql, args) - rows = self.cursor_to_dict(txn) + rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] + topo = rows[-1].topological_ordering + toke = rows[-1].stream_ordering if direction == 'b': # Tokens are positions between events. # This token points *after* the last event in the chunk. @@ -764,7 +784,7 @@ def paginate_room_events(self, room_id, from_key, to_key=None, Returns: tuple[list[dict], str]: Returns the results as a list of dicts and a token that points to the end of the result set. The dicts have - the keys "event_id", "toplogical_ordering" and "stream_orderign". + the keys "event_id", "topological_ordering" and "stream_orderign". """ from_key = RoomStreamToken.parse(from_key) @@ -777,7 +797,7 @@ def paginate_room_events(self, room_id, from_key, to_key=None, ) events = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) From c4af4c24ca988832018feaf0ac5a2f6dbb8bfe68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:55:34 +0100 Subject: [PATCH 4/7] Refactor get_recent_events_for_room return type There is no reason to return a tuple of tokens when the last token is always the token passed as an argument. Changing it makes it consistent with other storage APIs --- synapse/handlers/initial_sync.py | 10 +++++----- synapse/handlers/sync.py | 2 +- synapse/storage/stream.py | 16 +++++++++++++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index cd33a86599df..5a9aa0c16de5 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -181,8 +181,8 @@ def handle_room(event): self.store, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token) + end_token = now_token.copy_and_replace("room_key", room_end_token) time_now = self.clock.time_msec() d["messages"] = { @@ -325,8 +325,8 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config, self.store, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token[0]) - end_token = StreamToken.START.copy_and_replace("room_key", token[1]) + start_token = StreamToken.START.copy_and_replace("room_key", token) + end_token = StreamToken.START.copy_and_replace("room_key", stream_token) time_now = self.clock.time_msec() @@ -409,7 +409,7 @@ def get_receipts(): ) start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b52e4c2affa8..c25a76d215c6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -429,7 +429,7 @@ def get_state_at(self, room_id, stream_position): Returns: A Deferred map from ((type, state_key)->Event) """ - last_events, token = yield self.store.get_recent_events_for_room( + last_events, _ = yield self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 772d2c619877..b5baacd32c3e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -359,6 +359,20 @@ def f(txn): @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token): + """Get the most recent events in the room in topological ordering. + + Args: + room_id (str) + limit (int) + end_token (str): The stream token representing now. + + Returns: + Deferred[tuple[list[FrozenEvent], str]]: Returns a list of + events and a token pointint to the start of the returned + events. + The events returned are in ascending order. + """ + rows, token = yield self.get_recent_event_ids_for_room( room_id, limit, end_token, ) @@ -372,7 +386,7 @@ def get_recent_events_for_room(self, room_id, limit, end_token): self._set_before_and_after(events, rows) - defer.returnValue((events, (token, end_token))) + defer.returnValue((events, token)) @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token): From e5ab9cd24b2f63c6ca00ae6354dbcbfcd9127dfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:58:35 +0100 Subject: [PATCH 5/7] Don't unnecessarily require token to be stream token This allows calling the `get_recent_event_ids_for_room` function in more situations. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b5baacd32c3e..5e4327bb9659 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -407,7 +407,7 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): if limit == 0: defer.returnValue(([], end_token)) - end_token = RoomStreamToken.parse_stream_token(end_token) + end_token = RoomStreamToken.parse(end_token) rows, token = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, From e2accd7f1d21e34181dd4543eca30ad1ea971b4c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:59:45 +0100 Subject: [PATCH 6/7] Refactor sync APIs to reuse pagination API The sync API often returns events in a topological rather than stream ordering, e.g. when the user joined the room or on initial sync. When this happens we can reuse existing pagination storage functions. --- synapse/handlers/sync.py | 19 ++++++---- synapse/storage/stream.py | 73 +++++++++++++++++++-------------------- 2 files changed, 48 insertions(+), 44 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c25a76d215c6..b75daa340db4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -354,12 +354,19 @@ def _load_filtered_recents(self, room_id, sync_config, now_token, since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) + if since_key: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + else: + events, end_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + end_token=end_key, + ) loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5e4327bb9659..8bb4e857095d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -233,52 +233,49 @@ def get_rooms_that_changed(self, room_ids, from_key): @defer.inlineCallbacks def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, order='DESC'): - # Note: If from_key is None then we return in topological order. This - # is because in that case we're using this as a "get the last few messages - # in a room" function, rather than "get new messages since last sync" - if from_key is not None: - from_id = RoomStreamToken.parse_stream_token(from_key).stream - else: - from_id = None - to_id = RoomStreamToken.parse_stream_token(to_key).stream + """Get new room events in stream ordering since `from_key`. + + Args: + room_id (str) + from_key (str): Token from which no events are returned before + to_key (str): Token from which no events are returned after. (This + is typically the current stream token) + limit (int): Maximum number of events to return + order (str): Either "DESC" or "ASC". Determines which events are + returned when the result is limited. If "DESC" then the most + recent `limit` events are returned, otherwise returns the + oldest `limit` events. + + Returns: + Deferred[tuple[list[FrozenEvent], str]]: Returns the list of + events (in ascending order) and the token from the start of + the chunk of events returned. + """ if from_key == to_key: defer.returnValue(([], from_key)) - if from_id: - has_changed = yield self._events_stream_cache.has_entity_changed( - room_id, from_id - ) + from_id = RoomStreamToken.parse_stream_token(from_key).stream + to_id = RoomStreamToken.parse_stream_token(to_key).stream - if not has_changed: - defer.returnValue(([], from_key)) + has_changed = yield self._events_stream_cache.has_entity_changed( + room_id, from_id + ) - def f(txn): - if from_id is not None: - sql = ( - "SELECT event_id, stream_ordering FROM events WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering > ? AND stream_ordering <= ?" - " ORDER BY stream_ordering %s LIMIT ?" - ) % (order,) - txn.execute(sql, (room_id, from_id, to_id, limit)) - - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] - else: - sql = ( - "SELECT event_id, topological_ordering, stream_ordering" - " FROM events" - " WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering <= ?" - " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" - ) % (order, order,) - txn.execute(sql, (room_id, to_id, limit)) + if not has_changed: + defer.returnValue(([], from_key)) - rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] + def f(txn): + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering %s LIMIT ?" + ) % (order,) + txn.execute(sql, (room_id, from_id, to_id, limit)) + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) From d26bec8a437e44a05febd02473a8dcd8ff2c9f58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 15:56:07 +0100 Subject: [PATCH 7/7] Add comment to sync as to why code path is split --- synapse/handlers/sync.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b75daa340db4..263e42dded09 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -354,6 +354,11 @@ def _load_filtered_recents(self, room_id, sync_config, now_token, since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: + # If we have a since_key then we are trying to get any events + # that have happened since `since_key` up to `end_key`, so we + # can just use `get_room_events_stream_for_room`. + # Otherwise, we want to return the last N events in the room + # in toplogical ordering. if since_key: events, end_key = yield self.store.get_room_events_stream_for_room( room_id,