Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database v2. #7740

Merged
merged 13 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7740.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor getting replication updates from database.
56 changes: 10 additions & 46 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,26 +198,6 @@ def current_token_without_instance(
return lambda instance_name: current_token()


def db_query_to_update_function(
query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
) -> UpdateFunction:
"""Wraps a db query function which returns a list of rows to make it
suitable for use as an `update_function` for the Stream class
"""

async def update_function(instance_name, from_token, upto_token, limit):
rows = await query_function(from_token, upto_token, limit)
updates = [(row[0], row[1:]) for row in rows]
limited = False
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return update_function


def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
"""Makes a suitable function for use as an `update_function` that queries
the master process for updates.
Expand Down Expand Up @@ -393,7 +373,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_pushers_stream_token),
db_query_to_update_function(store.get_all_updated_pushers_rows),
store.get_all_updated_pushers_rows,
)


Expand Down Expand Up @@ -421,26 +401,12 @@ class CachesStreamRow:
ROW_TYPE = CachesStreamRow

def __init__(self, hs):
self.store = hs.get_datastore()
store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
self.store.get_cache_stream_token,
self._update_function,
)

async def _update_function(
self, instance_name: str, from_token: int, upto_token: int, limit: int
):
rows = await self.store.get_all_updated_caches(
instance_name, from_token, upto_token, limit
store.get_cache_stream_token,
store.get_all_updated_caches,
)
updates = [(row[0], row[1:]) for row in rows]
limited = False
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited


class PublicRoomsStream(Stream):
Expand All @@ -465,7 +431,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_public_room_stream_id),
db_query_to_update_function(store.get_all_new_public_rooms),
store.get_all_new_public_rooms,
)


Expand All @@ -486,7 +452,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
db_query_to_update_function(store.get_all_device_list_changes_for_remotes),
store.get_all_device_list_changes_for_remotes,
)


Expand All @@ -504,7 +470,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_to_device_stream_token),
db_query_to_update_function(store.get_all_new_device_messages),
store.get_all_new_device_messages,
)


Expand All @@ -524,7 +490,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_account_data_stream_id),
db_query_to_update_function(store.get_all_updated_tags),
store.get_all_updated_tags,
)


Expand Down Expand Up @@ -612,7 +578,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_group_stream_token),
db_query_to_update_function(store.get_all_groups_changes),
store.get_all_groups_changes,
)


Expand All @@ -630,7 +596,5 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
db_query_to_update_function(
store.get_all_user_signature_changes_for_remotes
),
store.get_all_user_signature_changes_for_remotes,
)
36 changes: 30 additions & 6 deletions synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
Expand Down Expand Up @@ -44,13 +44,30 @@ def __init__(self, database: Database, db_conn, hs):

async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
):
"""Fetches cache invalidation rows between the two given IDs written
by the given instance. Returns at most `limit` rows.
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for caches replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return []
return [], current_id, False

def get_all_updated_caches_txn(txn):
# We purposefully don't bound by the current token, as we want to
Expand All @@ -64,7 +81,14 @@ def get_all_updated_caches_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, instance_name, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#7636 seems to use a mixture of >, >= and == to determine if the stream was limited. I'm not sure if this inconsistency is on purpose or not. I would have thought > would be the proper choice here to avoid unnecessary work, but maybe that's wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, sorry. It mostly depends on the SQL query. Some are LIMIT ? and so will only ever return at most limit rows. Some have more fuzzy bounds and so could return more. I don't think anywhere uses >? If they do it sounds like a bug

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, typing uses >, but that is a special case because we have all results in memory and don't query the DB with a limit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Is this worth a comment in any of the locations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to the typing one 👍

upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_updated_caches", get_all_updated_caches_txn
Expand Down
54 changes: 38 additions & 16 deletions synapse/storage/data_stores/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import List, Tuple

from canonicaljson import json

Expand Down Expand Up @@ -207,47 +208,68 @@ def delete_messages_for_remote_destination_txn(txn):
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)

def get_all_new_device_messages(self, last_pos, current_pos, limit):
"""
async def get_all_new_device_messages(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for to device replication stream.

Args:
last_pos(int):
current_pos(int):
limit(int):
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A deferred list of rows from the device inbox
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""
if last_pos == current_pos:
return defer.succeed([])

if last_id == current_id:
return [], current_id, False

def get_all_new_device_messages_txn(txn):
# We limit like this as we might have multiple rows per stream_id, and
# we want to make sure we always get all entries for any stream_id
# we return.
upper_pos = min(current_pos, last_pos + limit)
upper_pos = min(current_id, last_id + limit)
sql = (
"SELECT max(stream_id), user_id"
" FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
" GROUP BY user_id"
)
txn.execute(sql, (last_pos, upper_pos))
rows = txn.fetchall()
txn.execute(sql, (last_id, upper_pos))
updates = [(row[0], row[1:]) for row in txn]

sql = (
"SELECT max(stream_id), destination"
" FROM device_federation_outbox"
" WHERE ? < stream_id AND stream_id <= ?"
" GROUP BY destination"
)
txn.execute(sql, (last_pos, upper_pos))
rows.extend(txn)
txn.execute(sql, (last_id, upper_pos))
updates.extend((row[0], row[1:]) for row in txn)

# Order by ascending stream ordering
rows.sort()
updates.sort()

return rows
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return self.db.runInteraction(
return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)

Expand Down
70 changes: 48 additions & 22 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,32 +582,58 @@ def get_users_whose_signatures_changed(self, user_id, from_key):
return set()

async def get_all_device_list_changes_for_remotes(
self, from_key: int, to_key: int, limit: int,
) -> List[Tuple[int, str]]:
"""Return a list of `(stream_id, entity)` which is the combined list of
changes to devices and which destinations need to be poked. Entity is
either a user ID (starting with '@') or a remote destination.
"""
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for device lists replication stream.

# This query Does The Right Thing where it'll correctly apply the
# bounds to the inner queries.
sql = """
SELECT stream_id, entity FROM (
SELECT stream_id, user_id AS entity FROM device_lists_stream
UNION ALL
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
) AS e
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

return await self.db.execute(
if last_id == current_id:
return [], current_id, False

def _get_all_device_list_changes_for_remotes(txn):
# This query Does The Right Thing where it'll correctly apply the
# bounds to the inner queries.
sql = """
SELECT stream_id, entity FROM (
SELECT stream_id, user_id AS entity FROM device_lists_stream
UNION ALL
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
) AS e
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
"""

txn.execute(sql, (last_id, current_id, limit))
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db.runInteraction(
"get_all_device_list_changes_for_remotes",
None,
sql,
from_key,
to_key,
limit,
_get_all_device_list_changes_for_remotes,
)

@cached(max_entries=10000)
Expand Down
Loading