Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise _update_client_ips_batch_txn to batch together database operations. #12252

Merged
merged 25 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7cf8b2e
Upsert many client IPs at once, more efficiently
reivilibre Mar 18, 2022
c7cfbf5
Newsfile
reivilibre Mar 18, 2022
45c98e0
Add a simple update many txn
reivilibre Mar 24, 2022
0646b4b
Add non-txn simple update many
reivilibre Mar 24, 2022
1acd8cd
Use simple_update_many for devices
reivilibre Mar 25, 2022
1e961ad
Isolate locals by splitting into two smaller functions
reivilibre Mar 25, 2022
503f5c8
Make _dump_to_tuple dump the entire table without undue ceremony
reivilibre Mar 28, 2022
5f29099
Rename some things to clarify
reivilibre Mar 28, 2022
e7c4907
Add a test for simple_update_many
reivilibre Mar 28, 2022
5675a94
Add an exception for when the number of value rows and key rows don't…
reivilibre Mar 28, 2022
e7985d2
Antilint
reivilibre Apr 6, 2022
7edf6f7
Add lock=True for the emulated many-upsert case
reivilibre Apr 7, 2022
9556aae
Don't double-lock the table
reivilibre Apr 7, 2022
303fba6
Inline column names
reivilibre Apr 7, 2022
ac4b1d5
Flatten user_ips builder
reivilibre Apr 7, 2022
34403cb
Flatten devices builder
reivilibre Apr 7, 2022
0f8e98b
Fix up docstring
reivilibre Apr 7, 2022
481b730
Remove dead variable
reivilibre Apr 7, 2022
99e6b66
Don't return for None
reivilibre Apr 7, 2022
3f7f659
Update synapse/storage/database.py
reivilibre Apr 7, 2022
50f2b91
Bail out when there's nothing to do
reivilibre Apr 7, 2022
93e1237
Apply suggestions from code review
reivilibre Apr 8, 2022
c0aaec4
Antilint
reivilibre Apr 8, 2022
c456958
Lock only once for batch emulated upserts
reivilibre Apr 8, 2022
214aacf
No need to manually lock
reivilibre Apr 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12252.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move `update_client_ip` background job from the main process to the background worker.
101 changes: 98 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ async def simple_upsert_many(
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
lock: bool = True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I think this is fine to default to True since simple_upsert_txn_emulated (which this eventually calls) defaults to True. So pretty much this was the default behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly — this is just providing a way to opt out of it.

) -> None:
"""
Upsert, many times.
Expand All @@ -1279,21 +1280,24 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
"""

# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)

return await self.runInteraction(
await self.runInteraction(
desc,
self.simple_upsert_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
lock=lock,
db_autocommit=autocommit,
)

Expand All @@ -1305,6 +1309,7 @@ def simple_upsert_many_txn(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1316,14 +1321,16 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self.simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values
txn, table, key_names, key_values, value_names, value_values, lock=lock
)

def simple_upsert_many_txn_emulated(
Expand All @@ -1334,6 +1341,7 @@ def simple_upsert_many_txn_emulated(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand All @@ -1345,17 +1353,24 @@ def simple_upsert_many_txn_emulated(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]

if lock:
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")
Comment on lines +1364 to +1367
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this code is used on postgres though since that doesn't need the emulated support here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, sigh, you're absolutely right. Locking on SQLite is a no-op (apparently! I haven't fully reasoned through how that's sufficient, but the code is a no-op...?!).

  • Postgres: always supports native upsert, so locking doesn't apply.
  • SQLite: may not support native upsert, locking is a no-op.

Why do upserts have locking at all then? Maybe I made the mistake of assuming that there must have been a reason for it because of the code that was here before...

In one of the cases, an emulated upsert can be a SELECT followed by an INSERT. In the other case, it's an UPDATE followed by an INSERT...

The latter case (UPDATE, INSERT) doesn't sound like it would need a lock, because both of those operations need a PENDING lock. There's no way another writer can get in the middle of those two.

(Assuming traditional SQLite3 locking) The first case (SELECT, INSERT) may be problematic because SELECT only needs a SHARED lock. But it needs to upgrade to a PENDING lock when it begins the INSERT. If there's another PENDING or stronger lock, it will fail and will need to retry the transaction.

I can't tell whether this works properly in WAL mode or not; the WAL docs aren't particularly clear about how transactions interact like the old-style locking page is. That said, the page on transactions has this to say:

A read transaction is used for reading only. A write transaction allows both reading and writing. A read transaction is started by a SELECT statement, and a write transaction is started by statements like CREATE, DELETE, DROP, INSERT, or UPDATE (collectively "write statements"). If a write statement occurs while a read transaction is active, then the read transaction is upgraded to a write transaction if possible. If some other database connection has already modified the database or is already in the process of modifying the database, then upgrading to a write transaction is not possible and the write statement will fail with SQLITE_BUSY.

Assuming that's still true for WAL mode, then I think it's safe on SQLite.


Ah! However, there's one case where Postgres DOES need the emulated support, so I need to take back some of the things I just wrote :/.
That's when the unique index on the table is not ready yet and the table is 'unsafe to upsert'. (This is only a notable effect with Postgres, since SQLite's indices block when adding; Postgres' are added concurrently sometimes). In that case, locking does serve a useful role because Postgres' MVCC means that duplicates could be introduced (and then lead to the failure of creating the index altogether).
Aside: user_directory_search is an eternal exception — it's always 'unsafe to upsert' on SQLite....

That turned into something more complex than I thought :/

Copy link
Contributor Author

@reivilibre reivilibre Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A read transaction is used for reading only. A write transaction allows both reading and writing. A read transaction is started by a SELECT statement, and a write transaction is started by statements like CREATE, DELETE, DROP, INSERT, or UPDATE (collectively "write statements"). If a write statement occurs while a read transaction is active, then the read transaction is upgraded to a write transaction if possible. If some other database connection has already modified the database or is already in the process of modifying the database, then upgrading to a write transaction is not possible and the write statement will fail with SQLITE_BUSY.

Assuming that's still true for WAL mode, then I think it's safe on SQLite.

I tried this and it seems safe on both WAL and non-WAL mode.

The advantage of WAL mode in this case is that another transaction merely running 'select' and hanging around doesn't prevent you from committing. But that transaction will have to restart (at least if it's in conflict; not sure about generally) — it gets told the database is locked each time it tries to commit or update.

I learnt a bit about SQLite's WAL mode, so that's good at least


for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
_vals = {x: y for x, y in zip(value_names, valv)}

self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)

def simple_upsert_many_txn_native_upsert(
self,
Expand Down Expand Up @@ -1792,6 +1807,86 @@ def simple_update_txn(

return txn.rowcount

async def simple_update_many(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
desc: str,
) -> None:
"""
Update, many times, using batching where possible.
If the keys don't match anything, nothing will be updated.

Args:
table: The table to update
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The names of value columns to update.
value_values: A list of each row's value column values.
"""

await self.runInteraction(
desc,
self.simple_update_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
)

@staticmethod
def simple_update_many_txn(
txn: LoggingTransaction,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Collection[Iterable[Any]],
) -> None:
"""
Update, many times, using batching where possible.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
If the keys don't match anything, nothing will be updated.

Args:
table: The table to update
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The names of value columns to update.
value_values: A list of each row's value column values.
"""

if len(value_values) != len(key_values):
raise ValueError(
f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number."
)

# List of tuples of (value values, then key values)
# (This matches the order needed for the query)
args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)]

for ks, vs in zip(key_values, value_values):
args.append(tuple(vs) + tuple(ks))

# 'col1 = ?, col2 = ?, ...'
set_clause = ", ".join(f"{n} = ?" for n in value_names)

if key_names:
# 'WHERE col3 = ? AND col4 = ? AND col5 = ?'
where_clause = "WHERE " + (" AND ".join(f"{n} = ?" for n in key_names))
else:
where_clause = ""

# UPDATE mytable SET col1 = ?, col2 = ? WHERE col3 = ? AND col4 = ?
sql = f"""
UPDATE {table} SET {set_clause} {where_clause}
"""

txn.execute_batch(sql, args)

async def simple_update_one(
self,
table: str,
Expand Down
66 changes: 34 additions & 32 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,10 @@ async def _update_client_ips_batch(self) -> None:
to_update = self._batch_row_update
self._batch_row_update = {}

await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
if to_update:
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

def _update_client_ips_batch_txn(
self,
Expand All @@ -629,42 +630,43 @@ def _update_client_ips_batch_txn(
self._update_on_this_worker
), "This worker is not designated to update client IPs"

if "user_ips" in self.db_pool._unsafe_to_upsert_tables or (
not self.database_engine.can_native_upsert
):
self.database_engine.lock_table(txn, "user_ips")
# Keys and values for the `user_ips` upsert.
user_ips_keys = []
user_ips_values = []

# Keys and values for the `devices` update.
devices_keys = []
devices_values = []

for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry

self.db_pool.simple_upsert_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
)
user_ips_keys.append((user_id, access_token, ip))
user_ips_values.append((user_agent, device_id, last_seen))

# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
self.db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)
devices_keys.append((user_id, device_id))
devices_values.append((user_agent, last_seen, ip))

self.db_pool.simple_upsert_many_txn(
txn,
table="user_ips",
key_names=("user_id", "access_token", "ip"),
key_values=user_ips_keys,
value_names=("user_agent", "device_id", "last_seen"),
value_values=user_ips_values,
)

if devices_values:
self.db_pool.simple_update_many_txn(
txn,
table="devices",
key_names=("user_id", "device_id"),
key_values=devices_keys,
value_names=("user_agent", "last_seen", "ip"),
value_values=devices_values,
)

async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
Expand Down
73 changes: 57 additions & 16 deletions tests/storage/test__base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import secrets
from typing import Any, Dict, Generator, List, Tuple
from typing import Generator, Tuple

from twisted.test.proto_helpers import MemoryReactor

Expand All @@ -24,7 +24,7 @@
from tests import unittest


class UpsertManyTests(unittest.HomeserverTestCase):
class UpdateUpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.storage = hs.get_datastores().main

Expand All @@ -46,9 +46,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
)
)

def _dump_to_tuple(
self, res: List[Dict[str, Any]]
) -> Generator[Tuple[int, str, str], None, None]:
def _dump_table_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]:
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)

for i in res:
yield (i["id"], i["username"], i["value"])

Expand All @@ -75,13 +79,8 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "there")},
)

Expand All @@ -102,12 +101,54 @@ def test_upsert_many(self) -> None:
)

# Check results are what we expect
res = self.get_success(
self.storage.db_pool.simple_select_list(
self.table_name, None, ["id, username, value"]
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)

def test_simple_update_many(self):
"""
simple_update_many performs many updates at once.
"""
# First add some data.
self.get_success(
self.storage.db_pool.simple_insert_many(
table=self.table_name,
keys=("id", "username", "value"),
values=[(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")],
desc="insert",
)
)

# Check the data made it to the table
self.assertEqual(
set(self._dump_to_tuple(res)),
{(1, "user1", "hello"), (2, "user2", "bleb")},
set(self._dump_table_to_tuple()),
{(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")},
)

# Now use simple_update_many
self.get_success(
self.storage.db_pool.simple_update_many(
table=self.table_name,
key_names=("username",),
key_values=(
("alice",),
("bob",),
("stranger",),
),
value_names=("value",),
value_values=(
("aaa!",),
("bbb!",),
("???",),
),
desc="update_many1",
)
)

# Check the table is how we expect:
# charlie has been left alone
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "alice", "aaa!"), (2, "bob", "bbb!"), (3, "charlie", "C")},
)