Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise _update_client_ips_batch_txn to batch together database operations. #12252

Merged
merged 25 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7cf8b2e
Upsert many client IPs at once, more efficiently
reivilibre Mar 18, 2022
c7cfbf5
Newsfile
reivilibre Mar 18, 2022
45c98e0
Add a simple update many txn
reivilibre Mar 24, 2022
0646b4b
Add non-txn simple update many
reivilibre Mar 24, 2022
1acd8cd
Use simple_update_many for devices
reivilibre Mar 25, 2022
1e961ad
Isolate locals by splitting into two smaller functions
reivilibre Mar 25, 2022
503f5c8
Make _dump_to_tuple dump the entire table without undue ceremony
reivilibre Mar 28, 2022
5f29099
Rename some things to clarify
reivilibre Mar 28, 2022
e7c4907
Add a test for simple_update_many
reivilibre Mar 28, 2022
5675a94
Add an exception for when the number of value rows and key rows don't…
reivilibre Mar 28, 2022
e7985d2
Antilint
reivilibre Apr 6, 2022
7edf6f7
Add lock=True for the emulated many-upsert case
reivilibre Apr 7, 2022
9556aae
Don't double-lock the table
reivilibre Apr 7, 2022
303fba6
Inline column names
reivilibre Apr 7, 2022
ac4b1d5
Flatten user_ips builder
reivilibre Apr 7, 2022
34403cb
Flatten devices builder
reivilibre Apr 7, 2022
0f8e98b
Fix up docstring
reivilibre Apr 7, 2022
481b730
Remove dead variable
reivilibre Apr 7, 2022
99e6b66
Don't return for None
reivilibre Apr 7, 2022
3f7f659
Update synapse/storage/database.py
reivilibre Apr 7, 2022
50f2b91
Bail out when there's nothing to do
reivilibre Apr 7, 2022
93e1237
Apply suggestions from code review
reivilibre Apr 8, 2022
c0aaec4
Antilint
reivilibre Apr 8, 2022
c456958
Lock only once for batch emulated upserts
reivilibre Apr 8, 2022
214aacf
No need to manually lock
reivilibre Apr 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12252.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move `update_client_ip` background job from the main process to the background worker.
72 changes: 72 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,78 @@ def simple_update_txn(

return txn.rowcount

async def simple_update_many(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
desc: str,
) -> None:
return await self.runInteraction(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
desc,
self.simple_update_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
)

@staticmethod
def simple_update_many_txn(
txn: LoggingTransaction,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Collection[Iterable[Any]],
) -> None:
"""
Update, many times, using batching where possible.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

Args:
table: The table to upsert into
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The names of value columns to update.
value_values: A list of each row's value column values.
"""

if len(value_values) != len(key_values):
raise ValueError(
f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number."
)

# List of value names, then key names
allnames: List[str] = []
allnames.extend(value_names)
allnames.extend(key_names)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# List of tuples of (value values, then key values)
# (This matches the order of `allnames` and the order of the query)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)]

for ks, vs in zip(key_values, value_values):
args.append(tuple(vs) + tuple(ks))

# 'col1 = ?, col2 = ?, ...'
set_clause = ", ".join(f"{n} = ?" for n in value_names)

if key_names:
# 'WHERE col3 = ? AND col4 = ? AND col5 = ?'
where_clause = "WHERE " + (" AND ".join(f"{n} = ?" for n in key_names))
else:
where_clause = ""

# UPDATE mytable SET col1 = ?, col2 = ? WHERE col3 = ? AND col4 = ?
sql = f"""
UPDATE {table} SET {set_clause} {where_clause}
"""

txn.execute_batch(sql, args)

async def simple_update_one(
self,
table: str,
Expand Down
74 changes: 47 additions & 27 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,37 +634,57 @@ def _update_client_ips_batch_txn(
):
self.database_engine.lock_table(txn, "user_ips")

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry

self.db_pool.simple_upsert_txn(
def update_user_ips() -> None:
# Keys and values for the `user_ips` upsert.
key_columns = "user_id", "access_token", "ip"
keys = []
value_columns = "user_agent", "device_id", "last_seen"
values = []

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
keys.append((user_id, access_token, ip))
values.append((user_agent, device_id, last_seen))

self.db_pool.simple_upsert_many_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
key_names=key_columns,
key_values=keys,
value_names=value_columns,
value_values=values,
# TODO lock=False
)

# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
self.db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)
def update_devices() -> None:
# Keys and values for the `devices` update.
key_columns = "user_id", "device_id"
keys = []
value_columns = "user_agent", "last_seen", "ip"
values = []

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry

# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
keys.append((user_id, device_id))
values.append((user_agent, last_seen, ip))

self.db_pool.simple_update_many_txn(
txn,
table="devices",
key_names=key_columns,
key_values=keys,
value_names=value_columns,
value_values=values,
)

# This update is split into two smaller functions so that we can
# be sure their locals doesn't overlap
update_user_ips()
update_devices()
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
Expand Down
73 changes: 57 additions & 16 deletions tests/storage/test__base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import secrets
from typing import Any, Dict, Generator, List, Tuple
from typing import Generator, Tuple

from twisted.test.proto_helpers import MemoryReactor

Expand All @@ -24,7 +24,7 @@
from tests import unittest


class UpsertManyTests(unittest.HomeserverTestCase):
class UpdateUpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.storage = hs.get_datastores().main

Expand All @@ -46,9 +46,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
)
)

def _dump_to_tuple(
self, res: List[Dict[str, Any]]
) -> Generator[Tuple[int, str, str], None, None]:
def _dump_table_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]:
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)

for i in res:
yield (i["id"], i["username"], i["value"])

Expand All @@ -75,13 +79,8 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "there")},
)

Expand All @@ -102,12 +101,54 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)

def test_simple_update_many(self):
"""
simple_update_many performs many updates at once.
"""
# First add some data.
self.get_success(
self.storage.db_pool.simple_insert_many(
table=self.table_name,
keys=("id", "username", "value"),
values=[(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")],
desc="insert",
)
)

# Check the data made it to the table
self.assertEqual(
set(self._dump_to_tuple(res)),
{(1, "user1", "hello"), (2, "user2", "bleb")},
set(self._dump_table_to_tuple()),
{(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")},
)

# Now use simple_update_many
self.get_success(
self.storage.db_pool.simple_update_many(
table=self.table_name,
key_names=("username",),
key_values=(
("alice",),
("bob",),
("stranger",),
),
value_names=("value",),
value_values=(
("aaa!",),
("bbb!",),
("???",),
),
desc="update_many1",
)
)

# Check the table is how we expect:
# charlie has been left alone
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "alice", "aaa!"), (2, "bob", "bbb!"), (3, "charlie", "C")},
)