Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Split up txn for fetching device keys (#15215)
Browse files Browse the repository at this point in the history
We look up keys in batches, but we should do that outside of the
transaction to avoid starving the database pool.
  • Loading branch information
erikjohnston authored Mar 7, 2023
1 parent 41f127e commit c69aae9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.d/15215.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor database transaction for query users' devices to reduce database pool contention.
10 changes: 9 additions & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,15 @@ def new_transaction(
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
if f.__closure__:
for i, cell in enumerate(f.__closure__):
if inspect.isgenerator(cell.cell_contents):
try:
contents = cell.cell_contents
except ValueError:
# cell.cell_contents can raise if the "cell" is empty,
# which indicates that the variable is currently
# unbound.
continue

if inspect.isgenerator(contents):
logger.error(
"Programming error: function %s references generator %s "
"via its closure",
Expand Down
24 changes: 16 additions & 8 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ async def get_e2e_device_keys_and_signatures(
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)

result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
self._get_e2e_device_keys_txn,
result = await self._get_e2e_device_keys(
query_list,
include_all_devices,
include_deleted_devices,
Expand Down Expand Up @@ -285,9 +283,8 @@ async def get_e2e_device_keys_and_signatures(
log_kv(result)
return result

def _get_e2e_device_keys_txn(
async def _get_e2e_device_keys(
self,
txn: LoggingTransaction,
query_list: Collection[Tuple[str, Optional[str]]],
include_all_devices: bool = False,
include_deleted_devices: bool = False,
Expand Down Expand Up @@ -319,7 +316,7 @@ def _get_e2e_device_keys_txn(

if user_list:
user_id_in_list_clause, user_args = make_in_list_sql_clause(
txn.database_engine, "user_id", user_list
self.database_engine, "user_id", user_list
)
query_clauses.append(user_id_in_list_clause)
query_params_list.append(user_args)
Expand All @@ -332,13 +329,16 @@ def _get_e2e_device_keys_txn(
user_device_id_in_list_clause,
user_device_args,
) = make_tuple_in_list_sql_clause(
txn.database_engine, ("user_id", "device_id"), user_device_batch
self.database_engine, ("user_id", "device_id"), user_device_batch
)
query_clauses.append(user_device_id_in_list_clause)
query_params_list.append(user_device_args)

result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
for query_clause, query_params in zip(query_clauses, query_params_list):

def get_e2e_device_keys_txn(
txn: LoggingTransaction, query_clause: str, query_params: list
) -> None:
sql = (
"SELECT user_id, device_id, "
" d.display_name, "
Expand All @@ -361,6 +361,14 @@ def _get_e2e_device_keys_txn(
display_name, db_to_json(key_json) if key_json else None
)

for query_clause, query_params in zip(query_clauses, query_params_list):
await self.db_pool.runInteraction(
"_get_e2e_device_keys",
get_e2e_device_keys_txn,
query_clause,
query_params,
)

if include_deleted_devices:
for user_id, device_id in deleted_devices:
if device_id is None:
Expand Down

0 comments on commit c69aae9

Please sign in to comment.