Skip to content

Commit

Permalink
Fix regression when bounding future tokens (#17391)
Browse files Browse the repository at this point in the history
Fix bug added in #17386, where we accidentally used `room_key` for the
receipts stream. See first commit.

Reviewable commit-by-commit
  • Loading branch information
erikjohnston committed Jul 2, 2024
1 parent 1ce59d7 commit b905ae2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/17391.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.
26 changes: 22 additions & 4 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
#

import logging
from typing import TYPE_CHECKING, Sequence, Tuple

import attr
Expand All @@ -41,6 +42,9 @@
from synapse.server import HomeServer


logger = logging.getLogger(__name__)


@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
Expand Down Expand Up @@ -139,17 +143,31 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken:
key
].get_max_allocated_token()

token = token.copy_and_replace(
key, token.room_key.bound_stream_token(max_token)
)
if max_token < token_value.get_max_stream_pos():
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(
key, token_value.bound_stream_token(max_token)
)
else:
assert isinstance(current_value, int)
if current_value < token_value:
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()

token = token.copy_and_replace(key, min(token_value, max_token))
if max_token < token_value:
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(key, max_token)

return token

Expand Down
37 changes: 31 additions & 6 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.types import JsonDict, StreamKeyType, UserID, create_requester
from synapse.types import (
JsonDict,
MultiWriterStreamToken,
RoomStreamToken,
StreamKeyType,
UserID,
create_requester,
)
from synapse.util import Clock

import tests.unittest
Expand Down Expand Up @@ -999,7 +1006,13 @@ def test_wait_for_future_sync_token(self) -> None:

self.get_success(sync_d, by=1.0)

def test_wait_for_invalid_future_sync_token(self) -> None:
@parameterized.expand(
[(key,) for key in StreamKeyType.__members__.values()],
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
)
def test_wait_for_invalid_future_sync_token(
self, stream_key: StreamKeyType
) -> None:
"""Like the previous test, except we give a token that has a stream
position ahead of what is in the DB, i.e. its invalid and we shouldn't
wait for the stream to advance (as it may never do so).
Expand All @@ -1010,11 +1023,23 @@ def test_wait_for_invalid_future_sync_token(self) -> None:
"""
user = self.register_user("alice", "password")

# Create a token and arbitrarily advance one of the streams.
# Create a token and advance one of the streams.
current_token = self.hs.get_event_sources().get_current_token()
since_token = current_token.copy_and_advance(
StreamKeyType.PRESENCE, current_token.presence_key + 1
)
token_value = current_token.get_field(stream_key)

# How we advance the streams depends on the type.
if isinstance(token_value, int):
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
elif isinstance(token_value, MultiWriterStreamToken):
since_token = current_token.copy_and_advance(
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
)
elif isinstance(token_value, RoomStreamToken):
since_token = current_token.copy_and_advance(
stream_key, RoomStreamToken(stream=token_value.stream + 1)
)
else:
raise Exception("Unreachable")

sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
Expand Down

0 comments on commit b905ae2

Please sign in to comment.