From dd2112e9e25ceb99989a5aaa8e55d3d4ea93305d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 19:01:58 -0500 Subject: [PATCH 1/6] Generate historic pagination token for /messages when no from token provided Part of https://github.com/matrix-org/synapse/issues/12281 Context: https://github.com/matrix-org/synapse/pull/12319#discussion_r840276412 --- scripts-dev/complement.sh | 2 +- synapse/handlers/pagination.py | 51 +++++++++++++++++++++++- synapse/handlers/room.py | 2 +- synapse/python_dependencies.py | 1 + synapse/storage/databases/main/stream.py | 47 ++++++++++++++++++++-- synapse/streams/events.py | 6 +-- 6 files changed, 98 insertions(+), 11 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index d1b59ff0401b..919016abc085 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -71,4 +71,4 @@ fi # Run the tests! echo "Images built; running complement" -go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... +go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/ diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 876b879483e7..218da77a59e1 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, RoomStreamToken from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -441,7 +441,54 @@ async def get_messages( if pagin_config.from_token: from_token = pagin_config.from_token else: - from_token = self.hs.get_event_sources().get_current_token_for_pagination() + from_token = ( + await self.hs.get_event_sources().get_current_token_for_pagination( + room_id + ) + ) + assert from_token.room_key.topological + # from_live_token = ( + # self.hs.get_event_sources().get_current_token_for_pagination() + # ) + # # Convert the live token (sXXX) into a historic token (tXXX-XXX) + # # which is more suitable for /messages. + # current_stream_ordering = from_live_token.room_key.stream + # current_topographical_ordering = ( + # await self.store.get_current_topological_token( + # room_id, current_stream_ordering + # ) + # ) + # from_token = from_live_token.copy_and_replace( + # "room_key", + # RoomStreamToken( + # current_topographical_ordering, current_stream_ordering + # ), + # ) + # logger.info( + # "get_messages(room_id=%s)\n\tfrom_token=%s\n\tcurrent_stream_ordering=%s\n\tcurrent_topographical_ordering=%s\n\tfrom_live_token=%s", + # room_id, + # await from_token.to_string(self.store), + # current_stream_ordering, + # current_topographical_ordering, + # await from_live_token.to_string(self.store), + # ) + logger.info( + "get_messages(room_id=%s)\n\tfrom_token=%s", + room_id, + await from_token.to_string(self.store), + ) + logger.info( + "asdf_get_debug_events_in_room_ordered_by_depth %s", + await self.store.asdf_get_debug_events_in_room_ordered_by_depth( + room_id + ), + ) + logger.info( + "asdf_get_debug_events_in_room_ordered_by_stream_ordering %s", + await self.store.asdf_get_debug_events_in_room_ordered_by_stream_ordering( + room_id + ), + ) if pagin_config.limit is None: # This shouldn't happen as we've set a default limit before this diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51a08fd2c08d..793cb30c7976 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1444,7 +1444,7 @@ async def get_new_events( def get_current_key(self) -> RoomStreamToken: return self.store.get_room_max_token() - def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: + def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: return self.store.get_room_events_max_id(room_id) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8419ab3aca95..0623a2a58ad6 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -89,6 +89,7 @@ "matrix-common~=1.1.0", # We need packaging.requirements.Requirement, added in 16.1. "packaging>=16.1", + "tabulate>=0.8.9", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8e764790dbc2..0227a9e01035 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -42,6 +42,7 @@ from frozendict import frozendict from twisted.internet import defer +from tabulate import tabulate from synapse.api.filtering import Filter from synapse.events import EventBase @@ -748,21 +749,21 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) - async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str: + async def get_room_events_max_id(self, room_id: Optional[str] = None) -> RoomStreamToken: """Returns the current token for rooms stream. By default, it returns the current global stream token. Specifying a `room_id` causes it to return the current room specific topological token. """ - token = self.get_room_max_stream_ordering() + stream_ordering = self.get_room_max_stream_ordering() if room_id is None: - return "s%d" % (token,) + return RoomStreamToken(None, stream_ordering) else: topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return "t%d-%d" % (topo, token) + return RoomStreamToken(topo, stream_ordering) def get_stream_id_for_event_txn( self, @@ -808,6 +809,44 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke ) return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + async def asdf_get_debug_events_in_room_ordered_by_depth(self, room_id: str) -> Any: + """Gets the topological token in a room after or at the given stream + ordering. + + Args: + room_id + """ + sql = ( + "SELECT depth, stream_ordering, type, state_key, event_id FROM events" + " WHERE events.room_id = ?" + " ORDER BY depth DESC, stream_ordering DESC;" + ) + rows = await self.db_pool.execute( + "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id + ) + + headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] + return tabulate(rows, headers=headers) + + async def asdf_get_debug_events_in_room_ordered_by_stream_ordering(self, room_id: str) -> Any: + """Gets the topological token in a room after or at the given stream + ordering. + + Args: + room_id + """ + sql = ( + "SELECT depth, stream_ordering, type, state_key, event_id FROM events" + " WHERE events.room_id = ?" + " ORDER BY stream_ordering DESC, depth DESC;" + ) + rows = await self.db_pool.execute( + "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id + ) + + headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] + return tabulate(rows, headers=headers) + async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream ordering. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index fb8fe1729516..09341f935c15 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Tuple +from typing import TYPE_CHECKING, Iterator, Optional, Tuple import attr @@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken: ) return token - def get_current_token_for_pagination(self) -> StreamToken: + async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: """Get the current token for a given room to be used to paginate events. @@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken: The current token for pagination. """ token = StreamToken( - room_key=self.sources.room.get_current_key(), + room_key=await self.sources.room.get_current_key_for_room(room_id), presence_key=0, typing_key=0, receipt_key=0, From ede118c611c2ab6be0805073a13fd843ecfd4b83 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 20:05:24 -0500 Subject: [PATCH 2/6] Clean up PR --- changelog.d/12370.bugfix | 1 + synapse/handlers/pagination.py | 44 ++---------------------- synapse/python_dependencies.py | 1 - synapse/storage/databases/main/stream.py | 43 ++--------------------- synapse/streams/events.py | 2 +- 5 files changed, 7 insertions(+), 84 deletions(-) create mode 100644 changelog.d/12370.bugfix diff --git a/changelog.d/12370.bugfix b/changelog.d/12370.bugfix new file mode 100644 index 000000000000..97dca978292b --- /dev/null +++ b/changelog.d/12370.bugfix @@ -0,0 +1 @@ +Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 218da77a59e1..d296d390bdcb 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -446,49 +446,9 @@ async def get_messages( room_id ) ) + # We expect `/messages` to use historic pagination tokens by default but + # `/messages` should still works with live tokens when manually provided. assert from_token.room_key.topological - # from_live_token = ( - # self.hs.get_event_sources().get_current_token_for_pagination() - # ) - # # Convert the live token (sXXX) into a historic token (tXXX-XXX) - # # which is more suitable for /messages. - # current_stream_ordering = from_live_token.room_key.stream - # current_topographical_ordering = ( - # await self.store.get_current_topological_token( - # room_id, current_stream_ordering - # ) - # ) - # from_token = from_live_token.copy_and_replace( - # "room_key", - # RoomStreamToken( - # current_topographical_ordering, current_stream_ordering - # ), - # ) - # logger.info( - # "get_messages(room_id=%s)\n\tfrom_token=%s\n\tcurrent_stream_ordering=%s\n\tcurrent_topographical_ordering=%s\n\tfrom_live_token=%s", - # room_id, - # await from_token.to_string(self.store), - # current_stream_ordering, - # current_topographical_ordering, - # await from_live_token.to_string(self.store), - # ) - logger.info( - "get_messages(room_id=%s)\n\tfrom_token=%s", - room_id, - await from_token.to_string(self.store), - ) - logger.info( - "asdf_get_debug_events_in_room_ordered_by_depth %s", - await self.store.asdf_get_debug_events_in_room_ordered_by_depth( - room_id - ), - ) - logger.info( - "asdf_get_debug_events_in_room_ordered_by_stream_ordering %s", - await self.store.asdf_get_debug_events_in_room_ordered_by_stream_ordering( - room_id - ), - ) if pagin_config.limit is None: # This shouldn't happen as we've set a default limit before this diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 0623a2a58ad6..8419ab3aca95 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -89,7 +89,6 @@ "matrix-common~=1.1.0", # We need packaging.requirements.Requirement, added in 16.1. "packaging>=16.1", - "tabulate>=0.8.9", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 0227a9e01035..8dfa3145608c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -42,7 +42,6 @@ from frozendict import frozendict from twisted.internet import defer -from tabulate import tabulate from synapse.api.filtering import Filter from synapse.events import EventBase @@ -749,7 +748,9 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) - async def get_room_events_max_id(self, room_id: Optional[str] = None) -> RoomStreamToken: + async def get_room_events_max_id( + self, room_id: Optional[str] = None + ) -> RoomStreamToken: """Returns the current token for rooms stream. By default, it returns the current global stream token. Specifying a @@ -809,44 +810,6 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke ) return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) - async def asdf_get_debug_events_in_room_ordered_by_depth(self, room_id: str) -> Any: - """Gets the topological token in a room after or at the given stream - ordering. - - Args: - room_id - """ - sql = ( - "SELECT depth, stream_ordering, type, state_key, event_id FROM events" - " WHERE events.room_id = ?" - " ORDER BY depth DESC, stream_ordering DESC;" - ) - rows = await self.db_pool.execute( - "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id - ) - - headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] - return tabulate(rows, headers=headers) - - async def asdf_get_debug_events_in_room_ordered_by_stream_ordering(self, room_id: str) -> Any: - """Gets the topological token in a room after or at the given stream - ordering. - - Args: - room_id - """ - sql = ( - "SELECT depth, stream_ordering, type, state_key, event_id FROM events" - " WHERE events.room_id = ?" - " ORDER BY stream_ordering DESC, depth DESC;" - ) - rows = await self.db_pool.execute( - "asdf_get_debug_events_in_room_ordered_by_depth", None, sql, room_id - ) - - headers = ["depth", "stream_ordering", "type", "state_key", "event_id"] - return tabulate(rows, headers=headers) - async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream ordering. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 09341f935c15..acf17ba623f4 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Optional, Tuple +from typing import TYPE_CHECKING, Iterator, Tuple import attr From bbb21e87fc16a2ea660e71ea5a02c7742e3f2245 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 20:08:16 -0500 Subject: [PATCH 3/6] Revert complement test script changes to make it stream test output as it runs --- scripts-dev/complement.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 919016abc085..d1b59ff0401b 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -71,4 +71,4 @@ fi # Run the tests! echo "Images built; running complement" -go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/ +go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... From b8ef19b48cb06a0b98a522889ac985d7641b645a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 20:11:34 -0500 Subject: [PATCH 4/6] Remove unused type --- synapse/handlers/pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d296d390bdcb..7ee334037376 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, RoomStreamToken +from synapse.types import JsonDict, Requester from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client From c3deca06cf0fb6dea4ecb29acc9188b6334216a8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 20:37:25 -0500 Subject: [PATCH 5/6] Fix tests missing required arg --- tests/storage/test_stream.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 52e41cdab406..78663a53fe0b 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -110,7 +110,9 @@ def prepare(self, reactor, clock, homeserver): def _filter_messages(self, filter: JsonDict) -> List[EventBase]: """Make a request to /messages with a filter, returns the chunk of events.""" - from_token = self.hs.get_event_sources().get_current_token_for_pagination() + from_token = self.get_success( + self.hs.get_event_sources().get_current_token_for_pagination(self.room_id) + ) events, next_key = self.get_success( self.hs.get_datastores().main.paginate_room_events( From 36243f39569af32adecc48c26c171adc6b5e2eeb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 4 Apr 2022 21:09:30 -0500 Subject: [PATCH 6/6] Update docstring and function name --- synapse/handlers/room.py | 2 +- synapse/storage/databases/main/stream.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 793cb30c7976..fb04cd083cfd 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1445,7 +1445,7 @@ def get_current_key(self) -> RoomStreamToken: return self.store.get_room_max_token() def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: - return self.store.get_room_events_max_id(room_id) + return self.store.get_current_room_stream_token_for_room_id(room_id) class ShutdownRoomResponse(TypedDict): diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8dfa3145608c..82e9ef02d269 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -748,14 +748,14 @@ def _f(txn): "get_room_event_before_stream_ordering", _f ) - async def get_room_events_max_id( + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: - """Returns the current token for rooms stream. + """Returns the current position of the rooms stream. - By default, it returns the current global stream token. Specifying a - `room_id` causes it to return the current room specific topological - token. + By default, it returns a live token with the current global stream + token. Specifying a `room_id` causes it to return a historic token with + the room specific topological token. """ stream_ordering = self.get_room_max_stream_ordering() if room_id is None: