From 23befb463217c4d206e6887650c1ada2b179c0c7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 13:17:42 +0000 Subject: [PATCH 1/7] Add table to track position of last converted device list change Add `device_lists_changes_converted_stream_position` table to track the `(stream id, room id)` of the last row in `device_lists_changes_in_room` that has been converted to `device_lists_outbound_pokes`. Signed-off-by: Sean Quah --- synapse/storage/databases/main/devices.py | 34 +++++++++++ .../12refactor_device_list_outbound_pokes.sql | 56 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 57230df5aec5..50e5509eaeb8 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2156,3 +2156,37 @@ def get_pending_remote_device_list_updates_for_room_txn( "get_pending_remote_device_list_updates_for_room", get_pending_remote_device_list_updates_for_room_txn, ) + + async def get_device_change_last_converted_pos(self) -> Tuple[int, str]: + """ + Get the position of the last row in `device_list_changes_in_room` that has been + converted to `device_lists_outbound_pokes`. + + Rows with a strictly greater position where `converted_to_destinations` is + `FALSE` have not been converted. + """ + + row = await self.db_pool.simple_select_one( + table="device_lists_changes_converted_stream_position", + keyvalues={}, + retcols=["stream_id", "room_id"], + desc="get_device_change_last_converted_pos", + ) + return row["stream_id"], row["room_id"] + + async def set_device_change_last_converted_pos( + self, + stream_id: int, + room_id: str, + ) -> None: + """ + Set the position of the last row in `device_list_changes_in_room` that has been + converted to `device_lists_outbound_pokes`. + """ + + await self.db_pool.simple_update_one( + table="device_lists_changes_converted_stream_position", + keyvalues={}, + updatevalues={"stream_id": stream_id, "room_id": room_id}, + desc="set_device_change_last_converted_pos", + ) diff --git a/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql new file mode 100644 index 000000000000..b0abce3e8313 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql @@ -0,0 +1,56 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Prior to this schema delta, we tracked the set of unconverted rows in +-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows +-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag +-- would be set. +-- +-- After this schema delta, the `converted_to_destinations` is still populated like +-- before, but the set of unconverted rows is determined by the `stream_id` in the new +-- `device_lists_changes_converted_stream_position` table. +-- +-- If rolled back, Synapse will re-send all device list changes that happened since the +-- schema delta. + +CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + -- The (stream id, room id) of the last row in `device_lists_changes_in_room` that + -- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger + -- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been + -- converted. + stream_id BIGINT NOT NULL, + -- `room_id` may be an empty string, which compares less than all valid room IDs. + room_id TEXT NOT NULL, + CHECK (Lock='X') +); + +-- TODO: Add `room_id` to the index? +-- "device_lists_changes_in_stream_id_unconverted" btree (stream_id) WHERE NOT converted_to_destinations + +INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES ( + ( + SELECT COALESCE( + -- The last converted stream id is the smallest unconverted stream id minus + -- one. + MIN(stream_id) - 1, + -- If there is no unconverted stream id, the last converted stream id is the + -- largest stream id. + -- Otherwise, pick 1, since stream ids start at 2. + (SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room) + ) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations + ), + '' +); From 437d6d15f15891ec83d1707a3891e260f9568056 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 13:18:57 +0000 Subject: [PATCH 2/7] Use (stream id, room id) position to fetch unconverted device changes Signed-off-by: Sean Quah --- synapse/handlers/device.py | 28 +++++++++++++++++++- synapse/storage/databases/main/devices.py | 32 +++++++++++++++++++---- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c597639a7ff5..9a5459fda938 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -682,13 +682,33 @@ async def _handle_new_device_update_async(self) -> None: hosts_already_sent_to: Set[str] = set() try: + stream_id, room_id = await self.store.get_device_change_last_converted_pos() + while True: self._handle_new_device_update_new_data = False - rows = await self.store.get_uncoverted_outbound_room_pokes() + max_stream_id = self.store.get_device_stream_token() + rows = await self.store.get_uncoverted_outbound_room_pokes( + stream_id, room_id + ) if not rows: # If the DB returned nothing then there is nothing left to # do, *unless* a new device list update happened during the # DB query. + + # Advance `(stream_id, room_id)`. + # `max_stream_id` comes from *before* the query for unconverted + # rows, which means that any unconverted rows must have a larger + # stream ID. + if max_stream_id > stream_id: + stream_id, room_id = max_stream_id, "" + await self.store.set_device_change_last_converted_pos( + stream_id, room_id + ) + else: + assert max_stream_id == stream_id + # Avoid moving `room_id` backwards. + pass + if self._handle_new_device_update_new_data: continue else: @@ -752,6 +772,12 @@ async def _handle_new_device_update_async(self) -> None: hosts_already_sent_to.update(hosts) current_stream_id = stream_id + # Advance `(stream_id, room_id)`. + _, _, room_id, stream_id, _ = rows[-1] + await self.store.set_device_change_last_converted_pos( + stream_id, room_id + ) + finally: self._handle_new_device_update_is_processing = False diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 50e5509eaeb8..43368ed70957 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2008,27 +2008,49 @@ def _add_device_outbound_room_poke_txn( ) async def get_uncoverted_outbound_room_pokes( - self, limit: int = 10 + self, start_stream_id: int, start_room_id: str, limit: int = 10 ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: """Get device list changes by room that have not yet been handled and written to `device_lists_outbound_pokes`. + Args: + start_stream_id: Together with `start_room_id`, indicates the position after + which to return device list changes. + start_room_id: Together with `start_stream_id`, indicates the position after + which to return device list changes. + limit: The maximum number of device list changes to return. + Returns: - A list of user ID, device ID, room ID, stream ID and optional opentracing context. + A list of user ID, device ID, room ID, stream ID and optional opentracing + context, in order of ascending (stream ID, room ID). """ sql = """ SELECT user_id, device_id, room_id, stream_id, opentracing_context FROM device_lists_changes_in_room - WHERE NOT converted_to_destinations - ORDER BY stream_id + WHERE + (stream_id > ? OR (stream_id = ? AND room_id > ?)) AND + stream_id <= ? AND + NOT converted_to_destinations + ORDER BY stream_id ASC, room_id ASC LIMIT ? """ def get_uncoverted_outbound_room_pokes_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: - txn.execute(sql, (limit,)) + txn.execute( + sql, + ( + start_stream_id, + start_stream_id, + start_room_id, + # Avoid returning rows if there may be uncommitted device list + # changes with smaller stream IDs. + self._device_list_id_gen.get_current_token(), + limit, + ), + ) return [ ( From 4c6fd81d68b704089d6fe6efc4ee8a89c109e440 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 13:20:13 +0000 Subject: [PATCH 3/7] Stop updating the `converted_to_destinations` column From now on, the `converted_to_destinations` column indicates rows that need converting to outbound pokes, but does not indicate whether the conversion has already taken place. Signed-off-by: Sean Quah --- synapse/handlers/device.py | 2 -- synapse/storage/databases/main/devices.py | 42 +++++------------------ tests/storage/test_devices.py | 3 +- 3 files changed, 10 insertions(+), 37 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9a5459fda938..da3ddafeae5a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -738,7 +738,6 @@ async def _handle_new_device_update_async(self) -> None: user_id=user_id, device_id=device_id, room_id=room_id, - stream_id=stream_id, hosts=hosts, context=opentracing_context, ) @@ -860,7 +859,6 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None: user_id=user_id, device_id=device_id, room_id=room_id, - stream_id=None, hosts=potentially_changed_hosts, context=None, ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 43368ed70957..5f67e419d478 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2072,49 +2072,25 @@ async def add_device_list_outbound_pokes( user_id: str, device_id: str, room_id: str, - stream_id: Optional[int], hosts: Collection[str], context: Optional[Dict[str, str]], ) -> None: """Queue the device update to be sent to the given set of hosts, calculated from the room ID. - - Marks the associated row in `device_lists_changes_in_room` as handled, - if `stream_id` is provided. """ + if not hosts: + return def add_device_list_outbound_pokes_txn( txn: LoggingTransaction, stream_ids: List[int] ) -> None: - if hosts: - self._add_device_outbound_poke_to_stream_txn( - txn, - user_id=user_id, - device_id=device_id, - hosts=hosts, - stream_ids=stream_ids, - context=context, - ) - - if stream_id: - self.db_pool.simple_update_txn( - txn, - table="device_lists_changes_in_room", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - "stream_id": stream_id, - "room_id": room_id, - }, - updatevalues={"converted_to_destinations": True}, - ) - - if not hosts: - # If there are no hosts then we don't try and generate stream IDs. - return await self.db_pool.runInteraction( - "add_device_list_outbound_pokes", - add_device_list_outbound_pokes_txn, - [], + self._add_device_outbound_poke_to_stream_txn( + txn, + user_id=user_id, + device_id=device_id, + hosts=hosts, + stream_ids=stream_ids, + context=context, ) async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index f37505b6cf6a..8e7db2c4ec39 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -28,7 +28,7 @@ def add_device_change(self, user_id, device_ids, host): """ for device_id in device_ids: - stream_id = self.get_success( + self.get_success( self.store.add_device_change_to_streams( user_id, [device_id], ["!some:room"] ) @@ -39,7 +39,6 @@ def add_device_change(self, user_id, device_ids, host): user_id=user_id, device_id=device_id, room_id="!some:room", - stream_id=stream_id, hosts=[host], context={}, ) From ddc9999a632f735462bc79fbcda8493d61cd08e6 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 13:32:57 +0000 Subject: [PATCH 4/7] Add newsfile Signed-off-by: Sean Quah --- changelog.d/14516.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14516.misc diff --git a/changelog.d/14516.misc b/changelog.d/14516.misc new file mode 100644 index 000000000000..51666c6ffc6d --- /dev/null +++ b/changelog.d/14516.misc @@ -0,0 +1 @@ +Refactor conversion of device list changes in room to outbound pokes to track unconverted rows using a `(stream ID, room ID)` position instead of updating the `converted_to_destinations` flag on every row. From 9e70ba648a54405bc25de68b2faacecafc2fde75 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 14:17:13 +0000 Subject: [PATCH 5/7] Fix `simple_select_one_txn` to handle empty `WHERE` clause --- synapse/storage/database.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0dc44b246cbb..a14b13aec83e 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2075,13 +2075,14 @@ def simple_select_one_txn( retcols: Collection[str], allow_none: bool = False, ) -> Optional[Dict[str, Any]]: - select_sql = "SELECT %s FROM %s WHERE %s" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k,) for k in keyvalues), - ) + select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table) + + if keyvalues: + select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),) + txn.execute(select_sql, list(keyvalues.values())) + else: + txn.execute(select_sql) - txn.execute(select_sql, list(keyvalues.values())) row = txn.fetchone() if not row: From 112bc786031be60b1f20ad55c05e6fd82358cfb0 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 15:50:29 +0000 Subject: [PATCH 6/7] fixup: use tuple comparison in query --- synapse/storage/databases/main/devices.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5f67e419d478..37629115ab51 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -2029,7 +2029,7 @@ async def get_uncoverted_outbound_room_pokes( SELECT user_id, device_id, room_id, stream_id, opentracing_context FROM device_lists_changes_in_room WHERE - (stream_id > ? OR (stream_id = ? AND room_id > ?)) AND + (stream_id, room_id) > (?, ?) AND stream_id <= ? AND NOT converted_to_destinations ORDER BY stream_id ASC, room_id ASC @@ -2042,7 +2042,6 @@ def get_uncoverted_outbound_room_pokes_txn( txn.execute( sql, ( - start_stream_id, start_stream_id, start_room_id, # Avoid returning rows if there may be uncommitted device list From b9b2f04edf1a8e5ba863794d10e28c67bf61c23b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 22 Nov 2022 15:53:34 +0000 Subject: [PATCH 7/7] fixup: remove todo --- .../main/delta/73/12refactor_device_list_outbound_pokes.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql index b0abce3e8313..93d7fcb79b8c 100644 --- a/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql +++ b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql @@ -37,9 +37,6 @@ CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position( CHECK (Lock='X') ); --- TODO: Add `room_id` to the index? --- "device_lists_changes_in_stream_id_unconverted" btree (stream_id) WHERE NOT converted_to_destinations - INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES ( ( SELECT COALESCE(