From 8348a34f58ae2922db0b79da5c272b4e648f10c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Sep 2020 16:23:20 +0100 Subject: [PATCH 1/5] Make parsing of stream tokens async --- synapse/rest/client/v1/events.py | 3 ++- synapse/rest/client/v1/initial_sync.py | 3 ++- synapse/rest/client/v1/room.py | 11 ++++++++--- synapse/rest/client/v2_alpha/keys.py | 3 ++- synapse/rest/client/v2_alpha/sync.py | 3 ++- synapse/storage/databases/main/purge_events.py | 8 ++++---- synapse/streams/config.py | 7 ++++--- synapse/types.py | 10 ++++++---- 8 files changed, 30 insertions(+), 18 deletions(-) diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 985d994f6bb5..1ecb77aa2694 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -33,6 +33,7 @@ def __init__(self, hs): super().__init__() self.event_stream_handler = hs.get_event_stream_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request, allow_guest=True) @@ -44,7 +45,7 @@ async def on_GET(self, request): if b"room_id" in request.args: room_id = request.args[b"room_id"][0].decode("ascii") - pagin_config = PaginationConfig.from_request(request) + pagin_config = await PaginationConfig.from_request(self.store, request) timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS if b"timeout" in request.args: try: diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index d7042786ce0c..91da0ee57303 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -27,11 +27,12 @@ def __init__(self, hs): super().__init__() self.initial_sync_handler = hs.get_initial_sync_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request) as_client_event = b"raw" not in request.args - pagination_config = PaginationConfig.from_request(request) + pagination_config = await PaginationConfig.from_request(self.store, request) include_archived = parse_boolean(request, "archived", default=False) content = await self.initial_sync_handler.snapshot_all_rooms( user_id=requester.user.to_string(), diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 7e64a2e0fe36..b63389e5fedf 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -451,6 +451,7 @@ def __init__(self, hs): super().__init__() self.message_handler = hs.get_message_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) @@ -465,7 +466,7 @@ async def on_GET(self, request, room_id): if at_token_string is None: at_token = None else: - at_token = StreamToken.from_string(at_token_string) + at_token = await StreamToken.from_string(self.store, at_token_string) # let you filter down on particular memberships. # XXX: this may not be the best shape for this API - we could pass in a filter @@ -521,10 +522,13 @@ def __init__(self, hs): super().__init__() self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=True) - pagination_config = PaginationConfig.from_request(request, default_limit=10) + pagination_config = await PaginationConfig.from_request( + self.store, request, default_limit=10 + ) as_client_event = b"raw" not in request.args filter_str = parse_string(request, b"filter", encoding="utf-8") if filter_str: @@ -580,10 +584,11 @@ def __init__(self, hs): super().__init__() self.initial_sync_handler = hs.get_initial_sync_handler() self.auth = hs.get_auth() + self.store = hs.get_datastore() async def on_GET(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=True) - pagination_config = PaginationConfig.from_request(request) + pagination_config = await PaginationConfig.from_request(self.store, request) content = await self.initial_sync_handler.room_initial_sync( room_id=room_id, requester=requester, pagin_config=pagination_config ) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 7abd6ff333b2..55c46065694b 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -180,6 +180,7 @@ def __init__(self, hs): super().__init__() self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() + self.store = hs.get_datastore() async def on_GET(self, request): requester = await self.auth.get_user_by_req(request, allow_guest=True) @@ -191,7 +192,7 @@ async def on_GET(self, request): # changes after the "to" as well as before. set_tag("to", parse_string(request, "to")) - from_token = StreamToken.from_string(from_token_string) + from_token = await StreamToken.from_string(self.store, from_token_string) user_id = requester.user.to_string() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 51e395cc6424..1a32b9dfc71f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -77,6 +77,7 @@ def __init__(self, hs): super().__init__() self.hs = hs self.auth = hs.get_auth() + self.store = hs.get_datastore() self.sync_handler = hs.get_sync_handler() self.clock = hs.get_clock() self.filtering = hs.get_filtering() @@ -152,7 +153,7 @@ async def on_GET(self, request): ) if since is not None: - since_token = StreamToken.from_string(since) + since_token = await StreamToken.from_string(self.store, since) else: since_token = None diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index d7a03cbf7dfc..ecfc6717b33b 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -42,17 +42,17 @@ async def purge_history( The set of state groups that are referenced by deleted events. """ + parsed_token = await RoomStreamToken.parse(self, token) + return await self.db_pool.runInteraction( "purge_history", self._purge_history_txn, room_id, - token, + parsed_token, delete_local_events, ) - def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): - token = RoomStreamToken.parse(token_str) - + def _purge_history_txn(self, txn, room_id, token, delete_local_events): # Tables that should be pruned: # event_auth # event_backward_extremities diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 0bdf846edf62..394e47c984ff 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -39,8 +39,9 @@ class PaginationConfig: limit = attr.ib(type=Optional[int]) @classmethod - def from_request( + async def from_request( cls, + store, request: SynapseRequest, raise_invalid_params: bool = True, default_limit: Optional[int] = None, @@ -54,13 +55,13 @@ def from_request( if from_tok == "END": from_tok = None # For backwards compat. elif from_tok: - from_tok = StreamToken.from_string(from_tok) + from_tok = await StreamToken.from_string(store, from_tok) except Exception: raise SynapseError(400, "'from' parameter is invalid") try: if to_tok: - to_tok = StreamToken.from_string(to_tok) + to_tok = await StreamToken.from_string(store, to_tok) except Exception: raise SynapseError(400, "'to' parameter is invalid") diff --git a/synapse/types.py b/synapse/types.py index 02bcc197ec7b..7208a38ca8c7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -393,7 +393,7 @@ class RoomStreamToken: stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) @classmethod - def parse(cls, string: str) -> "RoomStreamToken": + async def parse(cls, store, string: str) -> "RoomStreamToken": try: if string[0] == "s": return cls(topological=None, stream=int(string[1:])) @@ -453,13 +453,15 @@ class StreamToken: START = None # type: StreamToken @classmethod - def from_string(cls, string): + async def from_string(cls, store, string): try: keys = string.split(cls._SEPARATOR) while len(keys) < len(attr.fields(cls)): # i.e. old token from before receipt_key keys.append("0") - return cls(RoomStreamToken.parse(keys[0]), *(int(k) for k in keys[1:])) + return cls( + await RoomStreamToken.parse(store, keys[0]), *(int(k) for k in keys[1:]) + ) except Exception: raise SynapseError(400, "Invalid Token") @@ -493,7 +495,7 @@ def copy_and_replace(self, key, new_value) -> "StreamToken": return attr.evolve(self, **{key: new_value}) -StreamToken.START = StreamToken.from_string("s0_0") +StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True) From bdc0fd0fd48be6de15e5e0d6d5d4b3ab6470488f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 13:57:37 +0100 Subject: [PATCH 2/5] Rename RoomStreamToken.__str__ to .to_string --- synapse/rest/admin/__init__.py | 2 +- synapse/types.py | 16 ++++++++++++++-- tests/rest/client/v1/test_rooms.py | 12 ++++++------ tests/storage/test_purge.py | 6 +++--- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index ba53f66f027d..ce96e12f0b65 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id): raise SynapseError(400, "Event is for wrong room.") room_token = await self.store.get_topological_token_for_event(event_id) - token = str(room_token) + token = room_token.to_string() logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) elif "purge_up_to_ts" in body: diff --git a/synapse/types.py b/synapse/types.py index 7208a38ca8c7..335fb65d1140 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -428,7 +428,7 @@ def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken": def as_tuple(self) -> Tuple[Optional[int], int]: return (self.topological, self.stream) - def __str__(self) -> str: + def to_string(self) -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) else: @@ -466,7 +466,19 @@ async def from_string(cls, store, string): raise SynapseError(400, "Invalid Token") def to_string(self): - return self._SEPARATOR.join([str(k) for k in attr.astuple(self, recurse=False)]) + return self._SEPARATOR.join( + [ + self.room_key.to_string(), + str(self.presence_key), + str(self.typing_key), + str(self.receipt_key), + str(self.account_data_key), + str(self.push_rules_key), + str(self.to_device_key), + str(self.device_list_key), + str(self.groups_key), + ] + ) @property def room_stream_id(self): diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index a3287011e94a..237be2e3093f 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -902,16 +902,16 @@ def test_room_messages_purge(self): # Send a first message in the room, which will be removed by the purge. first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] - first_token = str( - self.get_success(store.get_topological_token_for_event(first_event_id)) - ) + first_token = self.get_success( + store.get_topological_token_for_event(first_event_id) + ).to_string() # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] - second_token = str( - self.get_success(store.get_topological_token_for_event(second_event_id)) - ) + second_token = self.get_success( + store.get_topological_token_for_event(second_event_id) + ).to_string() # Send a third event in the room to ensure we don't fall under any edge case # due to our marker being the latest forward extremity in the room. diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 723cd2893354..e00b75696642 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -47,9 +47,9 @@ def test_purge(self): storage = self.hs.get_storage() # Get the topological token - event = str( - self.get_success(store.get_topological_token_for_event(last["event_id"])) - ) + event = self.get_success( + store.get_topological_token_for_event(last["event_id"]) + ).to_string() # Purge everything before this topological token self.get_success(storage.purge_events.purge_history(self.room_id, event, True)) From af3de2f3eedc048c118ef947fa5180a098eb869e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 15:28:25 +0100 Subject: [PATCH 3/5] Make StreamToken.to_string async --- synapse/handlers/events.py | 4 ++-- synapse/handlers/initial_sync.py | 14 +++++++------- synapse/handlers/pagination.py | 8 ++++---- synapse/handlers/room.py | 8 +++++--- synapse/handlers/search.py | 8 ++++---- synapse/rest/admin/__init__.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 4 ++-- synapse/types.py | 21 +++++++++++++++++---- tests/rest/client/v1/test_rooms.py | 26 ++++++++++++++++++++------ tests/storage/test_purge.py | 9 ++++++--- 10 files changed, 68 insertions(+), 36 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 0875b74ea89c..539b4fc32e95 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -133,8 +133,8 @@ async def get_stream( chunk = { "chunk": chunks, - "start": tokens[0].to_string(), - "end": tokens[1].to_string(), + "start": await tokens[0].to_string(self.store), + "end": await tokens[1].to_string(self.store), } return chunk diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 43f15435def5..39a85801c1ad 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -203,8 +203,8 @@ async def handle_room(event: RoomsForUser): messages, time_now=time_now, as_client_event=as_client_event ) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), } d["state"] = await self._event_serializer.serialize_events( @@ -249,7 +249,7 @@ async def handle_room(event: RoomsForUser): ], "account_data": account_data_events, "receipts": receipt, - "end": now_token.to_string(), + "end": await now_token.to_string(self.store), } return ret @@ -348,8 +348,8 @@ async def _room_initial_sync_parted( "chunk": ( await self._event_serializer.serialize_events(messages, time_now) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), }, "state": ( await self._event_serializer.serialize_events( @@ -447,8 +447,8 @@ async def get_receipts(): "chunk": ( await self._event_serializer.serialize_events(messages, time_now) ), - "start": start_token.to_string(), - "end": end_token.to_string(), + "start": await start_token.to_string(self.store), + "end": await end_token.to_string(self.store), }, "state": state, "presence": presence, diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d6779a4b4439..2c2a633938ba 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -413,8 +413,8 @@ async def get_messages( if not events: return { "chunk": [], - "start": from_token.to_string(), - "end": next_token.to_string(), + "start": await from_token.to_string(self.store), + "end": await next_token.to_string(self.store), } state = None @@ -442,8 +442,8 @@ async def get_messages( events, time_now, as_client_event=as_client_event ) ), - "start": from_token.to_string(), - "end": next_token.to_string(), + "start": await from_token.to_string(self.store), + "end": await next_token.to_string(self.store), } if state: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 836b3f381a52..d5f7c78edf52 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1077,11 +1077,13 @@ def filter_evts(events): # the token, which we replace. token = StreamToken.START - results["start"] = token.copy_and_replace( + results["start"] = await token.copy_and_replace( "room_key", results["start"] - ).to_string() + ).to_string(self.store) - results["end"] = token.copy_and_replace("room_key", results["end"]).to_string() + results["end"] = await token.copy_and_replace( + "room_key", results["end"] + ).to_string(self.store) return results diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 6a76c20d7913..e9402e6e2efc 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -362,13 +362,13 @@ async def search(self, user, content, batch=None): self.storage, user.to_string(), res["events_after"] ) - res["start"] = now_token.copy_and_replace( + res["start"] = await now_token.copy_and_replace( "room_key", res["start"] - ).to_string() + ).to_string(self.store) - res["end"] = now_token.copy_and_replace( + res["end"] = await now_token.copy_and_replace( "room_key", res["end"] - ).to_string() + ).to_string(self.store) if include_profile: senders = { diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index ce96e12f0b65..57cac22252f7 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id): raise SynapseError(400, "Event is for wrong room.") room_token = await self.store.get_topological_token_for_event(event_id) - token = room_token.to_string() + token = await room_token.to_string(self.store) logger.info("[purge] purging up to token %s (event_id %s)", token, event_id) elif "purge_up_to_ts" in body: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 1a32b9dfc71f..63a8a1dd72c7 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -237,7 +237,7 @@ async def encode_response(self, time_now, sync_result, access_token_id, filter): "leave": sync_result.groups.leave, }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, - "next_batch": sync_result.next_batch.to_string(), + "next_batch": await sync_result.next_batch.to_string(self.store), } @staticmethod @@ -414,7 +414,7 @@ def serialize(events): result = { "timeline": { "events": serialized_timeline, - "prev_batch": room.timeline.prev_batch.to_string(), + "prev_batch": await room.timeline.prev_batch.to_string(self.store), "limited": room.timeline.limited, }, "state": {"events": serialized_state}, diff --git a/synapse/types.py b/synapse/types.py index 335fb65d1140..c258e605b586 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,17 @@ import string import sys from collections import namedtuple -from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Mapping, + MutableMapping, + Optional, + Tuple, + Type, + TypeVar, +) import attr from signedjson.key import decode_verify_key_bytes @@ -26,6 +36,9 @@ from synapse.api.errors import Codes, SynapseError +if TYPE_CHECKING: + from synapse.storage.databases.main import DataStore + # define a version of typing.Collection that works on python 3.5 if sys.version_info[:3] >= (3, 6, 0): from typing import Collection @@ -428,7 +441,7 @@ def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken": def as_tuple(self) -> Tuple[Optional[int], int]: return (self.topological, self.stream) - def to_string(self) -> str: + async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) else: @@ -465,10 +478,10 @@ async def from_string(cls, store, string): except Exception: raise SynapseError(400, "Invalid Token") - def to_string(self): + async def to_string(self, store: "DataStore") -> str: return self._SEPARATOR.join( [ - self.room_key.to_string(), + await self.room_key.to_string(store), str(self.presence_key), str(self.typing_key), str(self.receipt_key), diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 237be2e3093f..0d809d25d5d4 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -904,14 +904,16 @@ def test_room_messages_purge(self): first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] first_token = self.get_success( store.get_topological_token_for_event(first_event_id) - ).to_string() + ) + first_token_str = self.get_success(first_token.to_string(store)) # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] second_token = self.get_success( store.get_topological_token_for_event(second_event_id) - ).to_string() + ) + second_token_str = self.get_success(second_token.to_string(store)) # Send a third event in the room to ensure we don't fall under any edge case # due to our marker being the latest forward extremity in the room. @@ -921,7 +923,11 @@ def test_room_messages_purge(self): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) @@ -936,7 +942,7 @@ def test_room_messages_purge(self): pagination_handler._purge_history( purge_id=purge_id, room_id=self.room_id, - token=second_token, + token=second_token_str, delete_local_events=True, ) ) @@ -946,7 +952,11 @@ def test_room_messages_purge(self): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + second_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) @@ -960,7 +970,11 @@ def test_room_messages_purge(self): request, channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s" - % (self.room_id, first_token, json.dumps({"types": [EventTypes.Message]})), + % ( + self.room_id, + first_token_str, + json.dumps({"types": [EventTypes.Message]}), + ), ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index e00b75696642..cc1f3c53c51c 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -47,12 +47,15 @@ def test_purge(self): storage = self.hs.get_storage() # Get the topological token - event = self.get_success( + token = self.get_success( store.get_topological_token_for_event(last["event_id"]) - ).to_string() + ) + token_str = self.get_success(token.to_string(self.hs.get_datastore())) # Purge everything before this topological token - self.get_success(storage.purge_events.purge_history(self.room_id, event, True)) + self.get_success( + storage.purge_events.purge_history(self.room_id, token_str, True) + ) # 1-3 should fail and last will succeed, meaning that 1-3 are deleted # and last is not. From 482407e0caaa5fb1ae8a9e2edda7c69f74dccbc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 16:35:02 +0100 Subject: [PATCH 4/5] Newsfile --- changelog.d/8427.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8427.misc diff --git a/changelog.d/8427.misc b/changelog.d/8427.misc new file mode 100644 index 000000000000..c9656b9112b2 --- /dev/null +++ b/changelog.d/8427.misc @@ -0,0 +1 @@ +Make stream token serializing/deserializing async. From 4bb33429b3e7286de65d5079040701acd42ddf0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Sep 2020 18:54:03 +0100 Subject: [PATCH 5/5] MOAR TYPES FOR EVERYONE --- synapse/rest/client/v2_alpha/sync.py | 3 +-- synapse/streams/config.py | 4 ++-- synapse/types.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 63a8a1dd72c7..6779df952f77 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -152,10 +152,9 @@ async def on_GET(self, request): device_id=device_id, ) + since_token = None if since is not None: since_token = await StreamToken.from_string(self.store, since) - else: - since_token = None # send any outstanding server notices to the user. await self._server_notices_sender.on_user_syncing(user.to_string()) diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 394e47c984ff..fdda21d16584 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Optional @@ -21,6 +20,7 @@ from synapse.api.errors import SynapseError from synapse.http.servlet import parse_integer, parse_string from synapse.http.site import SynapseRequest +from synapse.storage.databases.main import DataStore from synapse.types import StreamToken logger = logging.getLogger(__name__) @@ -41,7 +41,7 @@ class PaginationConfig: @classmethod async def from_request( cls, - store, + store: "DataStore", request: SynapseRequest, raise_invalid_params: bool = True, default_limit: Optional[int] = None, diff --git a/synapse/types.py b/synapse/types.py index c258e605b586..bd271f9f1611 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -406,7 +406,7 @@ class RoomStreamToken: stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) @classmethod - async def parse(cls, store, string: str) -> "RoomStreamToken": + async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken": try: if string[0] == "s": return cls(topological=None, stream=int(string[1:])) @@ -466,7 +466,7 @@ class StreamToken: START = None # type: StreamToken @classmethod - async def from_string(cls, store, string): + async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": try: keys = string.split(cls._SEPARATOR) while len(keys) < len(attr.fields(cls)):