From 096aea6a8bad4560aac5c0ecbdf8065914134023 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 12 Aug 2020 16:13:30 -0400 Subject: [PATCH 1/6] Convert pusher to async/await. --- synapse/storage/databases/main/pusher.py | 95 ++++++++++++------------ 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index b5200fbe79cc..ac72634178e1 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -19,10 +19,8 @@ from canonicaljson import encode_canonical_json -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList logger = logging.getLogger(__name__) @@ -34,23 +32,22 @@ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]: Drops any rows whose data cannot be decoded """ for r in rows: - dataJson = r["data"] + data_json = r["data"] try: - r["data"] = db_to_json(dataJson) + r["data"] = db_to_json(data_json) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", r["id"], - dataJson, + data_json, e.args[0], ) continue yield r - @defer.inlineCallbacks - def user_has_pusher(self, user_id): - ret = yield self.db_pool.simple_select_one_onecol( + async def user_has_pusher(self, user_id): + ret = await self.db_pool.simple_select_one_onecol( "pushers", {"user_name": user_id}, "id", allow_none=True ) return ret is not None @@ -61,9 +58,8 @@ def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): def get_pushers_by_user_id(self, user_id): return self.get_pushers_by({"user_name": user_id}) - @defer.inlineCallbacks - def get_pushers_by(self, keyvalues): - ret = yield self.db_pool.simple_select_list( + async def get_pushers_by(self, keyvalues): + ret = await self.db_pool.simple_select_list( "pushers", keyvalues, [ @@ -87,16 +83,14 @@ def get_pushers_by(self, keyvalues): ) return self._decode_pushers_rows(ret) - @defer.inlineCallbacks - def get_all_pushers(self): + async def get_all_pushers(self): def get_pushers(txn): txn.execute("SELECT * FROM pushers") rows = self.db_pool.cursor_to_dict(txn) return self._decode_pushers_rows(rows) - rows = yield self.db_pool.runInteraction("get_all_pushers", get_pushers) - return rows + return await self.db_pool.runInteraction("get_all_pushers", get_pushers) async def get_all_updated_pushers_rows( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -164,8 +158,8 @@ def get_all_updated_pushers_rows_txn(txn): "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) - @cachedInlineCallbacks(num_args=1, max_entries=15000) - def get_if_user_has_pusher(self, user_id): + @cached(num_args=1, max_entries=15000) + async def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator raise NotImplementedError() @@ -189,34 +183,38 @@ def get_if_users_have_pushers(self, user_ids): return result - @defer.inlineCallbacks - def update_pusher_last_stream_ordering( + async def update_pusher_last_stream_ordering( self, app_id, pushkey, user_id, last_stream_ordering - ): - yield self.db_pool.simple_update_one( + ) -> None: + await self.db_pool.simple_update_one( "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, {"last_stream_ordering": last_stream_ordering}, desc="update_pusher_last_stream_ordering", ) - @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success( - self, app_id, pushkey, user_id, last_stream_ordering, last_success - ): + async def update_pusher_last_stream_ordering_and_success( + self, + app_id: str, + pushkey: str, + user_id: str, + last_stream_ordering: int, + last_success: int, + ) -> bool: """Update the last stream ordering position we've processed up to for the given pusher. Args: - app_id (str) - pushkey (str) - last_stream_ordering (int) - last_success (int) + app_id + pushkey + user_id + last_stream_ordering + last_success Returns: - Deferred[bool]: True if the pusher still exists; False if it has been deleted. + True if the pusher still exists; False if it has been deleted. """ - updated = yield self.db_pool.simple_update( + updated = await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={ @@ -228,18 +226,18 @@ def update_pusher_last_stream_ordering_and_success( return bool(updated) - @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self.db_pool.simple_update( + async def update_pusher_failing_since( + self, app_id, pushkey, user_id, failing_since + ) -> None: + await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) - @defer.inlineCallbacks - def get_throttle_params_by_room(self, pusher_id): - res = yield self.db_pool.simple_select_list( + async def get_throttle_params_by_room(self, pusher_id): + res = await self.db_pool.simple_select_list( "pusher_throttle", {"pusher": pusher_id}, ["room_id", "last_sent_ts", "throttle_ms"], @@ -255,11 +253,10 @@ def get_throttle_params_by_room(self, pusher_id): return params_by_room - @defer.inlineCallbacks - def set_throttle_params(self, pusher_id, room_id, params): + async def set_throttle_params(self, pusher_id, room_id, params) -> None: # no need to lock because `pusher_throttle` has a primary key on # (pusher, room_id) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, params, @@ -272,8 +269,7 @@ class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self): return self._pushers_id_gen.get_current_token() - @defer.inlineCallbacks - def add_pusher( + async def add_pusher( self, user_id, access_token, @@ -287,11 +283,11 @@ def add_pusher( data, last_stream_ordering, profile_tag="", - ): + ) -> None: with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ @@ -316,15 +312,16 @@ def add_pusher( if user_has_pusher is not True: # invalidate, since we the user might not have had a pusher before - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,), ) - @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): + async def delete_pusher_by_app_id_pushkey_user_id( + self, app_id, pushkey, user_id + ) -> None: def delete_pusher_txn(txn, stream_id): self._invalidate_cache_and_stream( txn, self.get_if_user_has_pusher, (user_id,) @@ -351,6 +348,6 @@ def delete_pusher_txn(txn, stream_id): ) with self._pushers_id_gen.get_next() as stream_id: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) From b13c3f7b9c299b1da361c7370e5ddac2a126bc6f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 12 Aug 2020 16:19:23 -0400 Subject: [PATCH 2/6] Convert push rule to async. --- synapse/rest/client/v1/push_rule.py | 3 +- synapse/storage/databases/main/push_rule.py | 74 ++++++++++----------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 00831879f387..0cd365082422 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Awaitable from synapse.api.errors import ( NotFoundError, @@ -163,7 +164,7 @@ def notify_user(self, user_id): stream_id, _ = self.store.get_push_rules_stream_token() self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) - def set_rule_attr(self, user_id, spec, val): + def set_rule_attr(self, user_id, spec, val) -> Awaitable: if spec["attr"] == "enabled": if isinstance(val, dict) and "enabled" in val: val = val["enabled"] diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 6562db5c2bde..0c0bd912fd72 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -32,7 +32,7 @@ from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ChainedIdGenerator from synapse.util import json_encoder -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -115,9 +115,9 @@ def get_max_push_rules_stream_id(self): """ raise NotImplementedError() - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_for_user(self, user_id): - rows = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_for_user(self, user_id): + rows = await self.db_pool.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, retcols=( @@ -133,7 +133,7 @@ def get_push_rules_for_user(self, user_id): rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) - enabled_map = yield self.get_push_rules_enabled_for_user(user_id) + enabled_map = await self.get_push_rules_enabled_for_user(user_id) use_new_defaults = user_id in self._users_new_default_push_rules @@ -141,9 +141,9 @@ def get_push_rules_for_user(self, user_id): return rules - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_enabled_for_user(self, user_id): - results = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_enabled_for_user(self, user_id): + results = await self.db_pool.simple_select_list( table="push_rules_enable", keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), @@ -205,14 +205,15 @@ def bulk_get_push_rules(self, user_ids): return results - @defer.inlineCallbacks - def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): + async def copy_push_rule_from_room_to_room( + self, new_room_id: str, user_id: str, rule: dict + ) -> None: """Copy a single push rule from one room to another for a specific user. Args: - new_room_id (str): ID of the new room. - user_id (str): ID of user the push rule belongs to. - rule (Dict): A push rule. + new_room_id: ID of the new room. + user_id : ID of user the push rule belongs to. + rule: A push rule. """ # Create new rule id rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) @@ -224,7 +225,7 @@ def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): condition["pattern"] = new_room_id # Add the rule for the new room - yield self.add_push_rule( + await self.add_push_rule( user_id=user_id, rule_id=new_rule_id, priority_class=rule["priority_class"], @@ -232,17 +233,16 @@ def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): actions=rule["actions"], ) - @defer.inlineCallbacks - def copy_push_rules_from_room_to_room_for_user( - self, old_room_id, new_room_id, user_id - ): + async def copy_push_rules_from_room_to_room_for_user( + self, old_room_id: str, new_room_id: str, user_id: str + ) -> None: """Copy all of the push rules from one room to another for a specific user. Args: - old_room_id (str): ID of the old room. - new_room_id (str): ID of the new room. - user_id (str): ID of user to copy push rules for. + old_room_id: ID of the old room. + new_room_id: ID of the new room. + user_id: ID of user to copy push rules for. """ # Retrieve push rules for this user user_push_rules = yield self.get_push_rules_for_user(user_id) @@ -254,7 +254,7 @@ def copy_push_rules_from_room_to_room_for_user( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions ): - yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) + await self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) @cachedList( cached_method_name="get_push_rules_enabled_for_user", @@ -332,8 +332,7 @@ def get_all_push_rule_updates_txn(txn): class PushRuleStore(PushRulesWorkerStore): - @defer.inlineCallbacks - def add_push_rule( + async def add_push_rule( self, user_id, rule_id, @@ -342,13 +341,13 @@ def add_push_rule( actions, before=None, after=None, - ): + ) -> None: conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids if before or after: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_relative_txn", self._add_push_rule_relative_txn, stream_id, @@ -362,7 +361,7 @@ def add_push_rule( after, ) else: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_highest_priority_txn", self._add_push_rule_highest_priority_txn, stream_id, @@ -546,16 +545,15 @@ def _upsert_push_rule_txn( }, ) - @defer.inlineCallbacks - def delete_push_rule(self, user_id, rule_id): + async def delete_push_rule(self, user_id: str, rule_id: str) -> None: """ Delete a push rule. Args specify the row to be deleted and can be any of the columns in the push_rule table, but below are the standard ones Args: - user_id (str): The matrix ID of the push rule owner - rule_id (str): The rule_id of the rule to be deleted + user_id: The matrix ID of the push rule owner + rule_id: The rule_id of the rule to be deleted """ def delete_push_rule_txn(txn, stream_id, event_stream_ordering): @@ -569,18 +567,17 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering, ) - @defer.inlineCallbacks - def set_push_rule_enabled(self, user_id, rule_id, enabled): + async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, stream_id, @@ -611,8 +608,9 @@ def _set_push_rule_enabled_txn( op="ENABLE" if enabled else "DISABLE", ) - @defer.inlineCallbacks - def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): + async def set_push_rule_actions( + self, user_id, rule_id, actions, is_default_rule + ) -> None: actions_json = json_encoder.encode(actions) def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): @@ -653,7 +651,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, stream_id, From 0411e50707b1bb9eb7210fed3a5e6ceb7ccf32d6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 12 Aug 2020 16:31:51 -0400 Subject: [PATCH 3/6] Add changelog. --- changelog.d/8075.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8075.misc diff --git a/changelog.d/8075.misc b/changelog.d/8075.misc new file mode 100644 index 000000000000..dfe4c03171d6 --- /dev/null +++ b/changelog.d/8075.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. From dc5ce2f61759eadb0b81b3a3a87c89ac343fa8a5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 12 Aug 2020 16:39:42 -0400 Subject: [PATCH 4/6] Fix mypy error. --- synapse/storage/databases/main/push_rule.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 0c0bd912fd72..09919f55d49b 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -137,9 +137,7 @@ async def get_push_rules_for_user(self, user_id): use_new_defaults = user_id in self._users_new_default_push_rules - rules = _load_rules(rows, enabled_map, use_new_defaults) - - return rules + return _load_rules(rows, enabled_map, use_new_defaults) @cached(max_entries=5000) async def get_push_rules_enabled_for_user(self, user_id): @@ -245,7 +243,7 @@ async def copy_push_rules_from_room_to_room_for_user( user_id: ID of user to copy push rules for. """ # Retrieve push rules for this user - user_push_rules = yield self.get_push_rules_for_user(user_id) + user_push_rules = await self.get_push_rules_for_user(user_id) # Get rules relating to the old room and copy them to the new room for rule in user_push_rules: From 1470031bd3b51368cc7bfa2bf629b612c11075b8 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 10:02:19 -0400 Subject: [PATCH 5/6] Make set_rule_attr async. --- synapse/rest/client/v1/push_rule.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 0cd365082422..c8dcf02d2be7 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -164,7 +164,7 @@ def notify_user(self, user_id): stream_id, _ = self.store.get_push_rules_stream_token() self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) - def set_rule_attr(self, user_id, spec, val) -> Awaitable: + async def set_rule_attr(self, user_id, spec, val): if spec["attr"] == "enabled": if isinstance(val, dict) and "enabled" in val: val = val["enabled"] @@ -174,7 +174,7 @@ def set_rule_attr(self, user_id, spec, val) -> Awaitable: # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val) + return await self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val) elif spec["attr"] == "actions": actions = val.get("actions") _check_actions(actions) @@ -189,7 +189,7 @@ def set_rule_attr(self, user_id, spec, val) -> Awaitable: if namespaced_rule_id not in rule_ids: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) - return self.store.set_push_rule_actions( + return await self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule ) else: From e39b8aa1d64a7d631866eafae45f9ae5434bf684 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 10:06:27 -0400 Subject: [PATCH 6/6] Fix styling. --- synapse/rest/client/v1/push_rule.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c8dcf02d2be7..e2df638cc570 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Awaitable - from synapse.api.errors import ( NotFoundError, StoreError, @@ -174,7 +172,9 @@ async def set_rule_attr(self, user_id, spec, val): # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return await self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val) + return await self.store.set_push_rule_enabled( + user_id, namespaced_rule_id, val + ) elif spec["attr"] == "actions": actions = val.get("actions") _check_actions(actions)