Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Generate historic pagination token for /messages when no ?from token provided #12370

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12370.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order.
Copy link
Contributor Author

@MadLittleMods MadLittleMods Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably use the same changelog as #12319 as this PR doesn't actually fix anything without additionally merging that PR.

In any case, this PR does update /messages to do the right thing imo so it would be a good change regardless. But with more of changelog like:

Update `/messages` to use historic pagination tokens by default.

2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ fi

# Run the tests!
echo "Images built; running complement"
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2716,msc3030 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/
11 changes: 9 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester
from synapse.types import JsonDict, Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -441,7 +441,14 @@ async def get_messages(
if pagin_config.from_token:
from_token = pagin_config.from_token
else:
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination(
room_id
)
)
# We expect `/messages` to use historic pagination tokens by default but
# `/messages` should still works with live tokens when manually provided.
assert from_token.room_key.topological

if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ async def get_new_events(
def get_current_key(self) -> RoomStreamToken:
return self.store.get_room_max_token()

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
return self.store.get_room_events_max_id(room_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
10 changes: 6 additions & 4 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,21 +748,23 @@ def _f(txn):
"get_room_event_before_stream_ordering", _f
)

async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
async def get_room_events_max_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
"""Returns the current token for rooms stream.

By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
"""
token = self.get_room_max_stream_ordering()
stream_ordering = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
return RoomStreamToken(None, stream_ordering)
else:
topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
return RoomStreamToken(topo, stream_ordering)

def get_stream_id_for_event_txn(
self,
Expand Down
4 changes: 2 additions & 2 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_current_token(self) -> StreamToken:
)
return token

def get_current_token_for_pagination(self) -> StreamToken:
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.

Expand All @@ -80,7 +80,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
The current token for pagination.
"""
token = StreamToken(
room_key=self.sources.room.get_current_key(),
room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down