Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020
Merged
1 change: 1 addition & 0 deletions changelog.d/7636.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor getting replication updates from database.
10 changes: 7 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import abc
import logging
from contextlib import contextmanager
from typing import Dict, Iterable, List, Set
from typing import Dict, Iterable, List, Set, Tuple

from six import iteritems, itervalues

Expand Down Expand Up @@ -775,7 +775,9 @@ async def is_visible(self, observed_user, observer_user):

return False

async def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
Expand All @@ -790,7 +792,9 @@ async def get_all_presence_updates(self, last_id, current_id, limit):
"""
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
rows = await self.store.get_all_presence_updates(
instance_name, last_id, current_id, limit
)
return rows

def notify_new_event(self):
Expand Down
12 changes: 6 additions & 6 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import List
from typing import List, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -259,14 +259,14 @@ def _push_update_local(self, member, typing):
)

async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
"""

if last_id == current_id:
return []
return [], current_id, False

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
Expand All @@ -280,9 +280,9 @@ async def get_all_typing_updates(
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.append((serial, (room_id, list(typing))))
rows.sort()
return rows[:limit]
return rows[:limit], current_id, len(rows) > limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is doing the wrong thing for the returned token when the limit is hit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am such a crank 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAYBE THIS TIME I'VE fIXeD IT?!!!??!1?


def get_current_token(self):
return self._latest_room_serial
Expand Down
4 changes: 1 addition & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,9 @@ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
updated_receipts = yield self.store.get_all_updated_receipts(
users_affected = yield self.store.get_users_sent_receipts_between(
min_stream_id - 1, max_stream_id
)
# This returns a tuple, user_id is at index 3
users_affected = {r[3] for r in updated_receipts}

for u in users_affected:
if u in self.pushers:
Expand Down
29 changes: 8 additions & 21 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
db_query_to_update_function(store.get_all_new_backfill_event_rows),
store.get_all_new_backfill_event_rows,
)


Expand All @@ -291,9 +291,7 @@ def __init__(self, hs):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = db_query_to_update_function(
presence_handler.get_all_presence_updates
)
update_function = presence_handler.get_all_presence_updates
richvdh marked this conversation as resolved.
Show resolved Hide resolved
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand All @@ -318,9 +316,7 @@ def __init__(self, hs):

if hs.config.worker_app is None:
# on the master, query the typing handler
update_function = db_query_to_update_function(
typing_handler.get_all_typing_updates
)
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand Down Expand Up @@ -352,7 +348,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
db_query_to_update_function(store.get_all_updated_receipts),
store.get_all_updated_receipts,
)


Expand All @@ -367,26 +363,17 @@ class PushRulesStream(Stream):

def __init__(self, hs):
self.store = hs.get_datastore()

super(PushRulesStream, self).__init__(
hs.get_instance_name(), self._current_token, self._update_function
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
)

def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token

async def _update_function(
self, instance_name: str, from_token: Token, to_token: Token, limit: int
):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)

limited = False
if len(rows) == limit:
to_token = rows[-1][0]
limited = True

return [(row[0], (row[2],)) for row in rows], to_token, limited


class PushersStream(Stream):
"""A user has added/changed/removed a pusher
Expand Down
38 changes: 32 additions & 6 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,29 @@ def get_ex_outlier_stream_rows_txn(txn):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)

def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
async def get_all_new_backfill_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for backfill replication stream, including all new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring is very helpful, but please can all the updated/new storage methods have one, not just this method?

backfilled events and events that have gone from being outliers to not.

Args:
instance_name: The writer we want to fetch updates from. Unused
here sincethere is only ever one writer.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, the position of the rows
returned up to, and whether we returned fewer rows than exists
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rephrase "the position of the rows returned up to"? it's somewhat unclear: inconsistent use of "position" and "token", inclusive or exclusive, etc.

"the last token included in the results", maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully I've clarified it. Unfortunately, technically, the returned token doesn't have to be the last token included in the results (since current_id could be greater due to a gap)

between the requested tokens due to the limit.

The updates are list of 2-tuples of stream ID and the row.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_new_backfill_event_rows(txn):
sql = (
Expand All @@ -1094,10 +1114,12 @@ def get_all_new_backfill_event_rows(txn):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
new_event_updates = txn.fetchall()
new_event_updates = [(row[0], row[1:]) for row in txn]

limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
limited = True
else:
upper_bound = current_id

Expand All @@ -1114,11 +1136,15 @@ def get_all_new_backfill_event_rows(txn):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
new_event_updates.extend(txn.fetchall())
new_event_updates.extend((row[0], row[1:]) for row in txn)

if len(new_event_updates) >= limit:
upper_bound = new_event_updates[-1][0]
limited = True

return new_event_updates
return new_event_updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)

Expand Down
20 changes: 16 additions & 4 deletions synapse/storage/data_stores/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple

from twisted.internet import defer

from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -73,9 +75,11 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
)
txn.execute(sql + clause, [stream_id] + list(args))

def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_presence_updates_txn(txn):
sql = """
Expand All @@ -89,9 +93,17 @@ def get_all_presence_updates_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]

upper_bound = current_id
limited = False
if len(updates) >= limit:
upper_bound = updates[-1][0]
limited = True

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)

Expand Down
34 changes: 22 additions & 12 deletions synapse/storage/data_stores/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import abc
import logging
from typing import Union
from typing import List, Tuple, Union

from canonicaljson import json

Expand Down Expand Up @@ -348,23 +348,33 @@ def bulk_get_push_rules_enabled(self, user_ids):
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
return results

def get_all_push_rule_updates(self, last_id, current_id, limit):
async def get_all_push_rule_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get all the push rules changes that have happend on the server"""
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_push_rule_updates_txn(txn):
sql = (
"SELECT stream_id, event_stream_ordering, user_id, rule_id,"
" op, priority_class, priority, conditions, actions"
" FROM push_rules_stream"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
sql = """
SELECT stream_id, user_id
FROM push_rules_stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we stuffing all those columns into the table if we don't care about them... 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, it does look like they're not used anywhere else either. Though I don't really propose doing anything in this PR.

WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]

limited = False
upper_bound = current_id
if len(updates) == limit:
limited = True
upper_bound = updates[-1][0]

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_push_rule_updates", get_all_push_rule_updates_txn
)

Expand Down
61 changes: 47 additions & 14 deletions synapse/storage/data_stores/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import abc
import logging
from typing import List, Tuple

from canonicaljson import json

Expand Down Expand Up @@ -266,26 +267,58 @@ def f(txn):
}
return results

def get_all_updated_receipts(self, last_id, current_id, limit=None):
def get_users_sent_receipts_between(self, last_id: int, current_id: int):
"""Get all users who sent receipts between `last_id` exclusive and
`current_id` inclusive.

Returns:
Deferred[List[str]]
"""

if last_id == current_id:
return defer.succeed([])

def get_all_updated_receipts_txn(txn):
sql = (
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
" FROM receipts_linearized"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
)
args = [last_id, current_id]
if limit is not None:
sql += " LIMIT ?"
args.append(limit)
txn.execute(sql, args)
def _get_users_sent_receipts_between_txn(txn):
sql = """
SELECT DISTINCT user_id FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (last_id, current_id))

return [r[0:5] + (json.loads(r[5]),) for r in txn]
return [r[0] for r in txn]

return self.db.runInteraction(
"get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
)

async def get_all_updated_receipts(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
if last_id == current_id:
return [], current_id, False

def get_all_updated_receipts_txn(txn):
sql = """
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))

updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]

limited = False
upper_bound = current_id

if len(updates) == limit:
limited = True
upper_bound = updates[-1][0]

return updates, upper_bound, limited

return await self.db.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
)

Expand Down