Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020
Merged
17 changes: 4 additions & 13 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,26 +363,17 @@ class PushRulesStream(Stream):

def __init__(self, hs):
self.store = hs.get_datastore()

super(PushRulesStream, self).__init__(
hs.get_instance_name(), self._current_token, self._update_function
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
)

def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token

async def _update_function(
self, instance_name: str, from_token: Token, to_token: Token, limit: int
):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)

limited = False
if len(rows) == limit:
to_token = rows[-1][0]
limited = True

return [(row[0], (row[2],)) for row in rows], to_token, limited


class PushersStream(Stream):
"""A user has added/changed/removed a pusher
Expand Down
34 changes: 22 additions & 12 deletions synapse/storage/data_stores/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import abc
import logging
from typing import Union
from typing import List, Tuple, Union

from canonicaljson import json

Expand Down Expand Up @@ -348,23 +348,33 @@ def bulk_get_push_rules_enabled(self, user_ids):
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
return results

def get_all_push_rule_updates(self, last_id, current_id, limit):
async def get_all_push_rule_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get all the push rules changes that have happend on the server"""
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_push_rule_updates_txn(txn):
sql = (
"SELECT stream_id, event_stream_ordering, user_id, rule_id,"
" op, priority_class, priority, conditions, actions"
" FROM push_rules_stream"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
sql = """
SELECT stream_id, user_id
FROM push_rules_stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we stuffing all those columns into the table if we don't care about them... 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, it does look like they're not used anywhere else either. Though I don't really propose doing anything in this PR.

WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]

limited = False
upper_bound = current_id
if len(updates) == limit:
limited = True
upper_bound = updates[-1][0]

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_push_rule_updates", get_all_push_rule_updates_txn
)

Expand Down