Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise _update_client_ips_batch_txn to batch together database operations. #12252

Merged
merged 25 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7cf8b2e
Upsert many client IPs at once, more efficiently
reivilibre Mar 18, 2022
c7cfbf5
Newsfile
reivilibre Mar 18, 2022
45c98e0
Add a simple update many txn
reivilibre Mar 24, 2022
0646b4b
Add non-txn simple update many
reivilibre Mar 24, 2022
1acd8cd
Use simple_update_many for devices
reivilibre Mar 25, 2022
1e961ad
Isolate locals by splitting into two smaller functions
reivilibre Mar 25, 2022
503f5c8
Make _dump_to_tuple dump the entire table without undue ceremony
reivilibre Mar 28, 2022
5f29099
Rename some things to clarify
reivilibre Mar 28, 2022
e7c4907
Add a test for simple_update_many
reivilibre Mar 28, 2022
5675a94
Add an exception for when the number of value rows and key rows don't…
reivilibre Mar 28, 2022
e7985d2
Antilint
reivilibre Apr 6, 2022
7edf6f7
Add lock=True for the emulated many-upsert case
reivilibre Apr 7, 2022
9556aae
Don't double-lock the table
reivilibre Apr 7, 2022
303fba6
Inline column names
reivilibre Apr 7, 2022
ac4b1d5
Flatten user_ips builder
reivilibre Apr 7, 2022
34403cb
Flatten devices builder
reivilibre Apr 7, 2022
0f8e98b
Fix up docstring
reivilibre Apr 7, 2022
481b730
Remove dead variable
reivilibre Apr 7, 2022
99e6b66
Don't return for None
reivilibre Apr 7, 2022
3f7f659
Update synapse/storage/database.py
reivilibre Apr 7, 2022
50f2b91
Bail out when there's nothing to do
reivilibre Apr 7, 2022
93e1237
Apply suggestions from code review
reivilibre Apr 8, 2022
c0aaec4
Antilint
reivilibre Apr 8, 2022
c456958
Lock only once for batch emulated upserts
reivilibre Apr 8, 2022
214aacf
No need to manually lock
reivilibre Apr 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12252.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move `update_client_ip` background job from the main process to the background worker.
93 changes: 90 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ async def simple_upsert_many(
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
lock: bool = True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I think this is fine to default to True since simple_upsert_txn_emulated (which this eventually calls) defaults to True. So pretty much this was the default behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly — this is just providing a way to opt out of it.

) -> None:
"""
Upsert, many times.
Expand All @@ -1279,21 +1280,23 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""

# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)

return await self.runInteraction(
await self.runInteraction(
desc,
self.simple_upsert_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
lock=lock,
db_autocommit=autocommit,
)

Expand All @@ -1305,6 +1308,7 @@ def simple_upsert_many_txn(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1316,14 +1320,15 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self.simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values
txn, table, key_names, key_values, value_names, value_values, lock=lock
)

def simple_upsert_many_txn_emulated(
Expand All @@ -1334,6 +1339,7 @@ def simple_upsert_many_txn_emulated(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand All @@ -1345,6 +1351,7 @@ def simple_upsert_many_txn_emulated(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
Expand All @@ -1355,7 +1362,7 @@ def simple_upsert_many_txn_emulated(
_keys = {x: y for x, y in zip(key_names, keyv)}
_vals = {x: y for x, y in zip(value_names, valv)}

self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=lock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this works fine since we lock the table (as part of the transaction) in the first iterate of the loop, but then it stays locked until the end of the transaction?

  1. Is this accurate?
  2. If so, is there overhead to locking it for every value? (Should we only lock the table on the first iteration of the loop?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yeah, the lock remains throughout the transaction, as far as I can tell.
  2. There may well be, hence adding the flag. Though I suppose you raise a good point that simple_upsert_many_txn_emulated could do that, rather than the calling code...

Note that, as it stands, there's no functional change to the code in the case that native upserts aren't supported. (I'm not even sure upon which databases this code will now run ... ancient SQLites? Do we even support any systems shipping such old SQLites nowadays?...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debian stretch (just about not EOL) ships an older version of SQLite without upserts, but buster (the next one along) has one modern enough. So I guess this code may not be entirely legacy yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reivilibre judging from your commits my comment above was accurate, I think?


def simple_upsert_many_txn_native_upsert(
self,
Expand Down Expand Up @@ -1792,6 +1799,86 @@ def simple_update_txn(

return txn.rowcount

async def simple_update_many(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
desc: str,
) -> None:
"""
Update, many times, using batching where possible.
If the keys don't match anything, nothing will be updated.

Args:
table: The table to update
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The names of value columns to update.
value_values: A list of each row's value column values.
"""

await self.runInteraction(
desc,
self.simple_update_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
)

@staticmethod
def simple_update_many_txn(
txn: LoggingTransaction,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Collection[Iterable[Any]],
) -> None:
"""
Update, many times, using batching where possible.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
If the keys don't match anything, nothing will be updated.

Args:
table: The table to update
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The names of value columns to update.
value_values: A list of each row's value column values.
"""

if len(value_values) != len(key_values):
raise ValueError(
f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number."
)

# List of tuples of (value values, then key values)
# (This matches the order needed for the query)
args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)]

for ks, vs in zip(key_values, value_values):
args.append(tuple(vs) + tuple(ks))

# 'col1 = ?, col2 = ?, ...'
set_clause = ", ".join(f"{n} = ?" for n in value_names)

if key_names:
# 'WHERE col3 = ? AND col4 = ? AND col5 = ?'
where_clause = "WHERE " + (" AND ".join(f"{n} = ?" for n in key_names))
else:
where_clause = ""

# UPDATE mytable SET col1 = ?, col2 = ? WHERE col3 = ? AND col4 = ?
sql = f"""
UPDATE {table} SET {set_clause} {where_clause}
"""

txn.execute_batch(sql, args)

async def simple_update_one(
self,
table: str,
Expand Down
64 changes: 36 additions & 28 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,10 @@ async def _update_client_ips_batch(self) -> None:
to_update = self._batch_row_update
self._batch_row_update = {}

await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
if to_update:
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

def _update_client_ips_batch_txn(
self,
Expand All @@ -634,37 +635,44 @@ def _update_client_ips_batch_txn(
):
self.database_engine.lock_table(txn, "user_ips")

# Keys and values for the `user_ips` upsert.
user_ips_keys = []
user_ips_values = []

# Keys and values for the `devices` update.
devices_keys = []
devices_values = []

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry

self.db_pool.simple_upsert_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
)
user_ips_keys.append((user_id, access_token, ip))
user_ips_values.append((user_agent, device_id, last_seen))

# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
self.db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)
devices_keys.append((user_id, device_id))
devices_values.append((user_agent, last_seen, ip))

self.db_pool.simple_upsert_many_txn(
txn,
table="user_ips",
key_names=("user_id", "access_token", "ip"),
key_values=user_ips_keys,
value_names=("user_agent", "device_id", "last_seen"),
value_values=user_ips_values,
lock=False,
)

if devices_values:
self.db_pool.simple_update_many_txn(
txn,
table="devices",
key_names=("user_id", "device_id"),
key_values=devices_keys,
value_names=("user_agent", "last_seen", "ip"),
value_values=devices_values,
)

async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
Expand Down
73 changes: 57 additions & 16 deletions tests/storage/test__base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import secrets
from typing import Any, Dict, Generator, List, Tuple
from typing import Generator, Tuple

from twisted.test.proto_helpers import MemoryReactor

Expand All @@ -24,7 +24,7 @@
from tests import unittest


class UpsertManyTests(unittest.HomeserverTestCase):
class UpdateUpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.storage = hs.get_datastores().main

Expand All @@ -46,9 +46,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
)
)

def _dump_to_tuple(
self, res: List[Dict[str, Any]]
) -> Generator[Tuple[int, str, str], None, None]:
def _dump_table_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]:
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)

for i in res:
yield (i["id"], i["username"], i["value"])

Expand All @@ -75,13 +79,8 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "there")},
)

Expand All @@ -102,12 +101,54 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)

def test_simple_update_many(self):
"""
simple_update_many performs many updates at once.
"""
# First add some data.
self.get_success(
self.storage.db_pool.simple_insert_many(
table=self.table_name,
keys=("id", "username", "value"),
values=[(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")],
desc="insert",
)
)

# Check the data made it to the table
self.assertEqual(
set(self._dump_to_tuple(res)),
{(1, "user1", "hello"), (2, "user2", "bleb")},
set(self._dump_table_to_tuple()),
{(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")},
)

# Now use simple_update_many
self.get_success(
self.storage.db_pool.simple_update_many(
table=self.table_name,
key_names=("username",),
key_values=(
("alice",),
("bob",),
("stranger",),
),
value_names=("value",),
value_values=(
("aaa!",),
("bbb!",),
("???",),
),
desc="update_many1",
)
)

# Check the table is how we expect:
# charlie has been left alone
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "alice", "aaa!"), (2, "bob", "bbb!"), (3, "charlie", "C")},
)