From 477b1ed6cfd130e5a004cda0c0b84509da2aa006 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Feb 2016 15:58:14 +0000 Subject: [PATCH 1/2] Fetch events in a separate transaction. This has a couple of benefits: - It reduces the time of transactions, allowing other database requests to run. - Fetching events is given a dedicated database thread, and so can't starve other database requests. --- synapse/storage/stream.py | 55 +++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a03458c2fce7..bcae3d718ea3 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -220,27 +220,29 @@ def f(txn): rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows - self._set_before_and_after(ret, rows, topo_order=False) + rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret.reverse() + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - if rows: - key = "s%d" % min(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = from_key + self._set_before_and_after(ret, rows, topo_order=False) - return ret, key - res = yield self.runInteraction("get_room_events_stream_for_room", f) - defer.returnValue(res) + ret.reverse() + if rows: + key = "s%d" % min(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = from_key + + defer.returnValue((ret, key)) + + @defer.inlineCallbacks def get_room_changes_for_user(self, user_id, from_key, to_key): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream @@ -249,14 +251,14 @@ def get_room_changes_for_user(self, user_id, from_key, to_key): to_id = RoomStreamToken.parse_stream_token(to_key).stream if from_key == to_key: - return defer.succeed([]) + defer.returnValue([]) if from_id: has_changed = self._membership_stream_cache.has_entity_changed( user_id, int(from_id) ) if not has_changed: - return defer.succeed([]) + defer.returnValue([]) def f(txn): if from_id is not None: @@ -281,17 +283,18 @@ def f(txn): txn.execute(sql, (user_id, to_id,)) rows = self.cursor_to_dict(txn) - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows + + rows = yield self.runInteraction("get_room_changes_for_user", f) - self._set_before_and_after(ret, rows, topo_order=False) + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return ret + self._set_before_and_after(ret, rows, topo_order=False) - return self.runInteraction("get_room_changes_for_user", f) + defer.returnValue(ret) def get_room_events_stream( self, From 8a391e33ae21f9a62c57cca8eea47435a14a6247 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Feb 2016 16:12:10 +0000 Subject: [PATCH 2/2] s/get_room_changes_for_user/get_membership_changes_for_user/ --- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/storage/stream.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 68e2c75a4833..799221c1984d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1013,7 +1013,7 @@ def get_new_events( limit=limit, ) else: - room_events = yield self.store.get_room_changes_for_user( + room_events = yield self.store.get_membership_changes_for_user( user.to_string(), from_key, to_key ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8d8d10da3342..dc686db541d4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -479,7 +479,7 @@ def incremental_sync_with_gap(self, sync_config, since_token): ) # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_room_changes_for_user( + rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bcae3d718ea3..338a9d40d55a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -243,7 +243,7 @@ def f(txn): defer.returnValue((ret, key)) @defer.inlineCallbacks - def get_room_changes_for_user(self, user_id, from_key, to_key): + def get_membership_changes_for_user(self, user_id, from_key, to_key): if from_key is not None: from_id = RoomStreamToken.parse_stream_token(from_key).stream else: @@ -285,7 +285,7 @@ def f(txn): return rows - rows = yield self.runInteraction("get_room_changes_for_user", f) + rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self._get_events( [r["event_id"] for r in rows],