Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert pusher databases to async/await. #8075

Merged
merged 6 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8075.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
9 changes: 5 additions & 4 deletions synapse/rest/client/v1/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from synapse.api.errors import (
NotFoundError,
StoreError,
Expand Down Expand Up @@ -163,7 +162,7 @@ def notify_user(self, user_id):
stream_id, _ = self.store.get_push_rules_stream_token()
self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id])

def set_rule_attr(self, user_id, spec, val):
async def set_rule_attr(self, user_id, spec, val):
if spec["attr"] == "enabled":
if isinstance(val, dict) and "enabled" in val:
val = val["enabled"]
Expand All @@ -173,7 +172,9 @@ def set_rule_attr(self, user_id, spec, val):
# bools directly, so let's not break them.
raise SynapseError(400, "Value for 'enabled' must be boolean")
namespaced_rule_id = _namespaced_rule_id_from_spec(spec)
return self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val)
return await self.store.set_push_rule_enabled(
user_id, namespaced_rule_id, val
)
elif spec["attr"] == "actions":
actions = val.get("actions")
_check_actions(actions)
Expand All @@ -188,7 +189,7 @@ def set_rule_attr(self, user_id, spec, val):

if namespaced_rule_id not in rule_ids:
raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,))
return self.store.set_push_rule_actions(
return await self.store.set_push_rule_actions(
user_id, namespaced_rule_id, actions, is_default_rule
)
else:
Expand Down
80 changes: 38 additions & 42 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ChainedIdGenerator
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -115,9 +115,9 @@ def get_max_push_rules_stream_id(self):
"""
raise NotImplementedError()

@cachedInlineCallbacks(max_entries=5000)
def get_push_rules_for_user(self, user_id):
rows = yield self.db_pool.simple_select_list(
@cached(max_entries=5000)
async def get_push_rules_for_user(self, user_id):
rows = await self.db_pool.simple_select_list(
table="push_rules",
keyvalues={"user_name": user_id},
retcols=(
Expand All @@ -133,17 +133,15 @@ def get_push_rules_for_user(self, user_id):

rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))

enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
enabled_map = await self.get_push_rules_enabled_for_user(user_id)

use_new_defaults = user_id in self._users_new_default_push_rules

rules = _load_rules(rows, enabled_map, use_new_defaults)
return _load_rules(rows, enabled_map, use_new_defaults)

return rules

@cachedInlineCallbacks(max_entries=5000)
def get_push_rules_enabled_for_user(self, user_id):
results = yield self.db_pool.simple_select_list(
@cached(max_entries=5000)
async def get_push_rules_enabled_for_user(self, user_id):
results = await self.db_pool.simple_select_list(
table="push_rules_enable",
keyvalues={"user_name": user_id},
retcols=("user_name", "rule_id", "enabled"),
Expand Down Expand Up @@ -205,14 +203,15 @@ def bulk_get_push_rules(self, user_ids):

return results

@defer.inlineCallbacks
def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
async def copy_push_rule_from_room_to_room(
self, new_room_id: str, user_id: str, rule: dict
) -> None:
"""Copy a single push rule from one room to another for a specific user.

Args:
new_room_id (str): ID of the new room.
user_id (str): ID of user the push rule belongs to.
rule (Dict): A push rule.
new_room_id: ID of the new room.
user_id : ID of user the push rule belongs to.
rule: A push rule.
"""
# Create new rule id
rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
Expand All @@ -224,28 +223,27 @@ def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
condition["pattern"] = new_room_id

# Add the rule for the new room
yield self.add_push_rule(
await self.add_push_rule(
user_id=user_id,
rule_id=new_rule_id,
priority_class=rule["priority_class"],
conditions=rule["conditions"],
actions=rule["actions"],
)

@defer.inlineCallbacks
def copy_push_rules_from_room_to_room_for_user(
self, old_room_id, new_room_id, user_id
):
async def copy_push_rules_from_room_to_room_for_user(
self, old_room_id: str, new_room_id: str, user_id: str
) -> None:
"""Copy all of the push rules from one room to another for a specific
user.

Args:
old_room_id (str): ID of the old room.
new_room_id (str): ID of the new room.
user_id (str): ID of user to copy push rules for.
old_room_id: ID of the old room.
new_room_id: ID of the new room.
user_id: ID of user to copy push rules for.
"""
# Retrieve push rules for this user
user_push_rules = yield self.get_push_rules_for_user(user_id)
user_push_rules = await self.get_push_rules_for_user(user_id)

# Get rules relating to the old room and copy them to the new room
for rule in user_push_rules:
Expand All @@ -254,7 +252,7 @@ def copy_push_rules_from_room_to_room_for_user(
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
for c in conditions
):
yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule)
await self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule)

@cachedList(
cached_method_name="get_push_rules_enabled_for_user",
Expand Down Expand Up @@ -332,8 +330,7 @@ def get_all_push_rule_updates_txn(txn):


class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks
def add_push_rule(
async def add_push_rule(
self,
user_id,
rule_id,
Expand All @@ -342,13 +339,13 @@ def add_push_rule(
actions,
before=None,
after=None,
):
) -> None:
conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions)
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
if before or after:
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_add_push_rule_relative_txn",
self._add_push_rule_relative_txn,
stream_id,
Expand All @@ -362,7 +359,7 @@ def add_push_rule(
after,
)
else:
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_add_push_rule_highest_priority_txn",
self._add_push_rule_highest_priority_txn,
stream_id,
Expand Down Expand Up @@ -546,16 +543,15 @@ def _upsert_push_rule_txn(
},
)

@defer.inlineCallbacks
def delete_push_rule(self, user_id, rule_id):
async def delete_push_rule(self, user_id: str, rule_id: str) -> None:
"""
Delete a push rule. Args specify the row to be deleted and can be
any of the columns in the push_rule table, but below are the
standard ones

Args:
user_id (str): The matrix ID of the push rule owner
rule_id (str): The rule_id of the rule to be deleted
user_id: The matrix ID of the push rule owner
rule_id: The rule_id of the rule to be deleted
"""

def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
Expand All @@ -569,18 +565,17 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):

with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"delete_push_rule",
delete_push_rule_txn,
stream_id,
event_stream_ordering,
)

@defer.inlineCallbacks
def set_push_rule_enabled(self, user_id, rule_id, enabled):
async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None:
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_set_push_rule_enabled_txn",
self._set_push_rule_enabled_txn,
stream_id,
Expand Down Expand Up @@ -611,8 +606,9 @@ def _set_push_rule_enabled_txn(
op="ENABLE" if enabled else "DISABLE",
)

@defer.inlineCallbacks
def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
async def set_push_rule_actions(
self, user_id, rule_id, actions, is_default_rule
) -> None:
actions_json = json_encoder.encode(actions)

def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
Expand Down Expand Up @@ -653,7 +649,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):

with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"set_push_rule_actions",
set_push_rule_actions_txn,
stream_id,
Expand Down
Loading