Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert additional databases to async/await part 2 #8200

Merged
merged 9 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8200.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
19 changes: 12 additions & 7 deletions synapse/events/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Any, Dict, List, Optional, Tuple, Union

import attr
from nacl.signing import SigningKey
Expand Down Expand Up @@ -97,14 +97,14 @@ def state_key(self):
def is_state(self):
return self._state_key is not None

async def build(self, prev_event_ids):
async def build(self, prev_event_ids: List[str]) -> EventBase:
"""Transform into a fully signed and hashed event

Args:
prev_event_ids (list[str]): The event IDs to use as the prev events
prev_event_ids: The event IDs to use as the prev events

Returns:
FrozenEvent
The signed and hashed event.
"""

state_ids = await self._state.get_current_state_ids(
Expand All @@ -114,8 +114,13 @@ async def build(self, prev_event_ids):

format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
auth_events = await self._store.add_event_hashes(auth_ids)
prev_events = await self._store.add_event_hashes(prev_event_ids)
# The types of auth/prev events changes between event versions.
auth_events = await self._store.add_event_hashes(
auth_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
prev_events = await self._store.add_event_hashes(
prev_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
else:
auth_events = auth_ids
prev_events = prev_event_ids
Expand All @@ -138,7 +143,7 @@ async def build(self, prev_event_ids):
"unsigned": self.unsigned,
"depth": depth,
"prev_state": [],
}
} # type: Dict[str, Any]

if self.is_state():
event_dict["state_key"] = self._state_key
Expand Down
13 changes: 3 additions & 10 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
Requester,
RoomAlias,
StreamToken,
UserID,
create_requester,
)
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
Expand Down Expand Up @@ -446,7 +439,7 @@ async def create_event(
event_dict: dict,
token_id: Optional[str] = None,
txn_id: Optional[str] = None,
prev_event_ids: Optional[Collection[str]] = None,
prev_event_ids: Optional[List[str]] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bunch of changes of Collection -> List which I think is actually necessary because this eventually gets serialized to JSON?

require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
Expand Down Expand Up @@ -786,7 +779,7 @@ async def create_new_client_event(
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
prev_event_ids: Optional[Collection[str]] = None,
prev_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client

Expand Down
12 changes: 2 additions & 10 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,7 @@
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
Collection,
JsonDict,
Requester,
RoomAlias,
RoomID,
StateMap,
UserID,
)
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room

Expand Down Expand Up @@ -184,7 +176,7 @@ async def _local_membership_update(
target: UserID,
room_id: str,
membership: str,
prev_event_ids: Collection[str],
prev_event_ids: List[str],
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async def insert_client_ip(
self._batch_row_update[key] = (user_agent, device_id, now)

@wrap_as_background_process("update_client_ips")
def _update_client_ips_batch(self):
async def _update_client_ips_batch(self) -> None:

# If the DB pool has already terminated, don't try updating
if not self.db_pool.is_running():
Expand All @@ -405,7 +405,7 @@ def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:

return room_id

def update_aliases_for_room(
async def update_aliases_for_room(
self, old_room_id: str, new_room_id: str, creator: Optional[str] = None,
):
) -> None:
"""Repoint all of the aliases for a given room, to a different room.

Args:
Expand Down Expand Up @@ -189,6 +189,6 @@ def _update_aliases_for_room_txn(txn):
txn, self.get_aliases_for_room, (new_room_id,)
)

return self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"_update_aliases_for_room_txn", _update_aliases_for_room_txn
)
5 changes: 3 additions & 2 deletions synapse/storage/databases/main/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from synapse.api.errors import Codes, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached


Expand All @@ -40,7 +41,7 @@ async def get_user_filter(self, user_localpart, filter_id):

return db_to_json(def_json)

def add_user_filter(self, user_localpart, user_filter):
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> str:
def_json = encode_canonical_json(user_filter)

# Need an atomic transaction to SELECT the maximal ID so far then
Expand Down Expand Up @@ -71,4 +72,4 @@ def _do_txn(txn):

return filter_id

return self.db_pool.runInteraction("add_user_filter", _do_txn)
return await self.db_pool.runInteraction("add_user_filter", _do_txn)
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/openid.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from synapse.storage._base import SQLBaseStore


Expand All @@ -15,7 +17,9 @@ async def insert_open_id_token(
desc="insert_open_id_token",
)

def get_user_id_for_open_id_token(self, token, ts_now_ms):
async def get_user_id_for_open_id_token(
self, token: str, ts_now_ms: int
) -> Optional[str]:
def get_user_id_for_token_txn(txn):
sql = (
"SELECT user_id FROM open_id_tokens"
Expand All @@ -30,6 +34,6 @@ def get_user_id_for_token_txn(txn):
else:
return rows[0][0]

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_user_id_for_token", get_user_id_for_token_txn
)
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ async def maybe_delete_remote_profile_cache(self, user_id):
desc="delete_remote_profile_cache",
)

def get_remote_profile_cache_entries_that_expire(self, last_checked):
async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
) -> Dict[str, str]:
"""Get all users who haven't been checked since `last_checked`
"""

Expand All @@ -153,7 +155,7 @@ def _get_remote_profile_cache_entries_that_expire_txn(txn):

return self.db_pool.cursor_to_dict(txn)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"get_remote_profile_cache_entries_that_expire",
_get_remote_profile_cache_entries_that_expire_txn,
)
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import logging
from typing import List, Tuple, Union

from twisted.internet import defer

from synapse.push.baserules import list_with_base_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, db_to_json
Expand Down Expand Up @@ -149,9 +147,11 @@ async def get_push_rules_enabled_for_user(self, user_id):
)
return {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}

def have_push_rules_changed_for_user(self, user_id, last_id):
async def have_push_rules_changed_for_user(
self, user_id: str, last_id: int
) -> bool:
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
return defer.succeed(False)
return False
else:

def have_push_rules_changed_txn(txn):
Expand All @@ -163,7 +163,7 @@ def have_push_rules_changed_txn(txn):
(count,) = txn.fetchone()
return bool(count)

return self.db_pool.runInteraction(
return await self.db_pool.runInteraction(
"have_push_rules_changed", have_push_rules_changed_txn
)

Expand Down
Loading