Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Replace uses of simple_insert_many with simple_insert_many_values. (#…
Browse files Browse the repository at this point in the history
…11742)

This should be (slightly) more efficient and it is simpler
to have a single method for inserting multiple values.
  • Loading branch information
clokep authored Jan 14, 2022
1 parent d70169b commit 3e0536c
Show file tree
Hide file tree
Showing 19 changed files with 263 additions and 298 deletions.
1 change: 1 addition & 0 deletions changelog.d/11742.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor efficiency improvements when inserting many values into the database.
44 changes: 18 additions & 26 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,41 +123,33 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
job_name = body["job_name"]

if job_name == "populate_stats_process_rooms":
jobs = [
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
]
jobs = [("populate_stats_process_rooms", "{}", "")]
elif job_name == "regenerate_directory":
jobs = [
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
"depends_on": "",
},
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
("populate_user_directory_createtables", "{}", ""),
(
"populate_user_directory_process_rooms",
"{}",
"populate_user_directory_createtables",
),
(
"populate_user_directory_process_users",
"{}",
"populate_user_directory_process_rooms",
),
(
"populate_user_directory_cleanup",
"{}",
"populate_user_directory_process_users",
),
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")

try:
await self._store.db_pool.simple_insert_many(
table="background_updates",
keys=("update_name", "progress_json", "depends_on"),
values=jobs,
desc=f"admin_api_run_{job_name}",
)
Expand Down
54 changes: 2 additions & 52 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,56 +934,6 @@ def simple_insert_txn(
txn.execute(sql, vals)

async def simple_insert_many(
self, table: str, values: List[Dict[str, Any]], desc: str
) -> None:
"""Executes an INSERT query on the named table.
The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values should be preferred for new code.
Args:
table: string giving the table name
values: dict of new column names and values for them
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(desc, self.simple_insert_many_txn, table, values)

@staticmethod
def simple_insert_many_txn(
txn: LoggingTransaction, table: str, values: List[Dict[str, Any]]
) -> None:
"""Executes an INSERT query on the named table.
The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values_txn should be preferred for new code.
Args:
txn: The transaction to use.
table: string giving the table name
values: dict of new column names and values for them
"""
if not values:
return

# This is a *slight* abomination to get a list of tuples of key names
# and a list of tuples of value names.
#
# i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
# => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
#
# The sort is to ensure that we don't rely on dictionary iteration
# order.
keys, vals = zip(
*(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
)

for k in keys:
if k != keys[0]:
raise RuntimeError("All items must have the same keys")

return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)

async def simple_insert_many_values(
self,
table: str,
keys: Collection[str],
Expand All @@ -1002,11 +952,11 @@ async def simple_insert_many_values(
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
desc, self.simple_insert_many_values_txn, table, keys, values
desc, self.simple_insert_many_txn, table, keys, values
)

@staticmethod
def simple_insert_many_values_txn(
def simple_insert_many_txn(
txn: LoggingTransaction,
table: str,
keys: Collection[str],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,9 @@ def _add_account_data_for_user(
self.db_pool.simple_insert_many_txn(
txn,
table="ignored_users",
keys=("ignorer_user_id", "ignored_user_id"),
values=[
{"ignorer_user_id": user_id, "ignored_user_id": u}
for u in currently_ignored_users - previously_ignored_users
(user_id, u) for u in currently_ignored_users - previously_ignored_users
],
)

Expand Down
30 changes: 16 additions & 14 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,21 @@ def add_messages_txn(txn, now_ms, stream_id):
self.db_pool.simple_insert_many_txn(
txn,
table="device_federation_outbox",
keys=(
"destination",
"stream_id",
"queued_ts",
"messages_json",
"instance_name",
),
values=[
{
"destination": destination,
"stream_id": stream_id,
"queued_ts": now_ms,
"messages_json": json_encoder.encode(edu),
"instance_name": self._instance_name,
}
(
destination,
stream_id,
now_ms,
json_encoder.encode(edu),
self._instance_name,
)
for destination, edu in remote_messages_by_destination.items()
],
)
Expand Down Expand Up @@ -571,14 +578,9 @@ def _add_messages_to_local_device_inbox_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_inbox",
keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"),
values=[
{
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"message_json": message_json,
"instance_name": self._instance_name,
}
(user_id, device_id, stream_id, message_json, self._instance_name)
for user_id, messages_by_device in local_by_user_then_device.items()
for device_id, message_json in messages_by_device.items()
],
Expand Down
37 changes: 22 additions & 15 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,12 +1386,9 @@ def _update_remote_device_list_cache_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_remote_cache",
keys=("user_id", "device_id", "content"),
values=[
{
"user_id": user_id,
"device_id": content["device_id"],
"content": json_encoder.encode(content),
}
(user_id, content["device_id"], json_encoder.encode(content))
for content in devices
],
)
Expand Down Expand Up @@ -1479,8 +1476,9 @@ def _add_device_change_to_stream_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_stream",
keys=("stream_id", "user_id", "device_id"),
values=[
{"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
(stream_id, user_id, device_id)
for stream_id, device_id in zip(stream_ids, device_ids)
],
)
Expand All @@ -1507,18 +1505,27 @@ def _add_device_outbound_poke_to_stream_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
keys=(
"destination",
"stream_id",
"user_id",
"device_id",
"sent",
"ts",
"opentracing_context",
),
values=[
{
"destination": destination,
"stream_id": next(next_stream_id),
"user_id": user_id,
"device_id": device_id,
"sent": False,
"ts": now,
"opentracing_context": json_encoder.encode(context)
(
destination,
next(next_stream_id),
user_id,
device_id,
False,
now,
json_encoder.encode(context)
if whitelisted_homeserver(destination)
else "{}",
}
)
for destination in hosts
for device_id in device_ids
],
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ def alias_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_insert_many_txn(
txn,
table="room_alias_servers",
values=[
{"room_alias": room_alias.to_string(), "server": server}
for server in servers
],
keys=("room_alias", "server"),
values=[(room_alias.to_string(), server) for server in servers],
)

self._invalidate_cache_and_stream(
Expand Down
34 changes: 23 additions & 11 deletions synapse/storage/databases/main/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ async def add_e2e_room_keys(
values = []
for (room_id, session_id, room_key) in room_keys:
values.append(
{
"user_id": user_id,
"version": version_int,
"room_id": room_id,
"session_id": session_id,
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json_encoder.encode(room_key["session_data"]),
}
(
user_id,
version_int,
room_id,
session_id,
room_key["first_message_index"],
room_key["forwarded_count"],
room_key["is_verified"],
json_encoder.encode(room_key["session_data"]),
)
)
log_kv(
{
Expand All @@ -131,7 +131,19 @@ async def add_e2e_room_keys(
)

await self.db_pool.simple_insert_many(
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
table="e2e_room_keys",
keys=(
"user_id",
"version",
"room_id",
"session_id",
"first_message_index",
"forwarded_count",
"is_verified",
"session_data",
),
values=values,
desc="add_e2e_room_keys",
)

@trace
Expand Down
42 changes: 25 additions & 17 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,16 @@ def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
self.db_pool.simple_insert_many_txn(
txn,
table="e2e_one_time_keys_json",
keys=(
"user_id",
"device_id",
"algorithm",
"key_id",
"ts_added_ms",
"key_json",
),
values=[
{
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
"ts_added_ms": time_now,
"key_json": json_bytes,
}
(user_id, device_id, algorithm, key_id, time_now, json_bytes)
for algorithm, key_id, json_bytes in new_keys
],
)
Expand Down Expand Up @@ -1186,15 +1187,22 @@ async def store_e2e_cross_signing_signatures(
"""
await self.db_pool.simple_insert_many(
"e2e_cross_signing_signatures",
[
{
"user_id": user_id,
"key_id": item.signing_key_id,
"target_user_id": item.target_user_id,
"target_device_id": item.target_device_id,
"signature": item.signature,
}
keys=(
"user_id",
"key_id",
"target_user_id",
"target_device_id",
"signature",
),
values=[
(
user_id,
item.signing_key_id,
item.target_user_id,
item.target_device_id,
item.signature,
)
for item in signatures
],
"add_e2e_signing_key",
desc="add_e2e_signing_key",
)
Loading

0 comments on commit 3e0536c

Please sign in to comment.