Skip to content

Commit

Permalink
Fix sync waiting for an invalid token from the "future" (#17386)
Browse files Browse the repository at this point in the history
Fixes #17274, hopefully.

Basically, old versions of Synapse could advance streams without
persisting anything in the DB (fixed in #17229). On restart those
updates would get lost, and so the position of the stream would revert
to an older position. If this happened across an upgrade to a later
Synapse version which included #17215, then sync could get blocked
indefinitely (until the stream advanced to the position in the token).

We fix this by bounding the stream positions we'll wait for to the
maximum position of the underlying stream ID generator.
  • Loading branch information
erikjohnston committed Jul 2, 2024
1 parent 5b5280e commit 1ce59d7
Show file tree
Hide file tree
Showing 17 changed files with 229 additions and 31 deletions.
1 change: 1 addition & 0 deletions changelog.d/17386.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.
7 changes: 7 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,13 @@ async def check_for_updates(

async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
"""Wait for this worker to catch up with the given stream token."""
current_token = self.event_sources.get_current_token()
if stream_token.is_before_or_eq(current_token):
return True

# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
stream_token = await self.event_sources.bound_future_token(stream_token)

start = self.clock.time_msec()
while True:
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
Expand All @@ -71,7 +68,7 @@ def __init__(
self._instance_name in hs.config.worker.writers.account_data
)

self._account_data_id_gen: AbstractStreamIdGenerator
self._account_data_id_gen: MultiWriterIdGenerator

self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
Expand Down Expand Up @@ -113,6 +110,9 @@ def get_max_account_data_stream_id(self) -> int:
"""
return self._account_data_id_gen.get_current_token()

def get_account_data_id_generator(self) -> MultiWriterIdGenerator:
return self._account_data_id_gen

@cached()
async def get_global_account_data_for_user(
self, user_id: str
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -92,7 +89,7 @@ def __init__(
self._instance_name in hs.config.worker.writers.to_device
)

self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
self._to_device_msg_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
Expand Down Expand Up @@ -169,6 +166,9 @@ def process_replication_position(
def get_to_device_stream_token(self) -> int:
return self._to_device_msg_id_gen.get_current_token()

def get_to_device_id_generator(self) -> MultiWriterIdGenerator:
return self._to_device_msg_id_gen

async def get_messages_for_user_devices(
self,
user_ids: Collection[str],
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def device_lists_in_rooms_have_changed(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()

def get_device_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._device_list_id_gen

async def count_devices_by_users(
self, user_ids: Optional[Collection[str]] = None
) -> int:
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator
self._stream_id_gen: MultiWriterIdGenerator
self._backfill_id_gen: MultiWriterIdGenerator

self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -83,7 +80,7 @@ def __init__(
super().__init__(database, db_conn, hs)

self._instance_name = hs.get_instance_name()
self._presence_id_gen: AbstractStreamIdGenerator
self._presence_id_gen: MultiWriterIdGenerator

self._can_persist_presence = (
self._instance_name in hs.config.worker.writers.presence
Expand Down Expand Up @@ -455,6 +452,9 @@ async def get_presence_for_all_users(
def get_current_presence_token(self) -> int:
return self._presence_id_gen.get_current_token()

def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._presence_id_gen

def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]:
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def get_max_push_rules_stream_id(self) -> int:
"""
return self._push_rules_stream_id_gen.get_current_token()

def get_push_rules_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._push_rules_stream_id_gen

def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@
LoggingTransaction,
)
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import (
JsonDict,
JsonMapping,
Expand Down Expand Up @@ -76,7 +73,7 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdGenerator
self._receipts_id_gen: MultiWriterIdGenerator

self._can_write_to_receipts = (
self._instance_name in hs.config.worker.writers.receipts
Expand Down Expand Up @@ -136,6 +133,9 @@ def get_max_receipt_stream_id(self) -> MultiWriterStreamToken:
def get_receipt_stream_id_for_instance(self, instance_name: str) -> int:
return self._receipts_id_gen.get_current_token_for_writer(instance_name)

def get_receipts_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._receipts_id_gen

def get_last_unthreaded_receipt_for_user_txn(
self,
txn: LoggingTransaction,
Expand Down
11 changes: 5 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
Expand Down Expand Up @@ -151,7 +147,7 @@ def __init__(

self.config: HomeServerConfig = hs.config

self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
self._un_partial_stated_rooms_stream_id_gen: MultiWriterIdGenerator

self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
Expand Down Expand Up @@ -1409,6 +1405,9 @@ def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
instance_name
)

def get_un_partial_stated_rooms_id_generator(self) -> MultiWriterIdGenerator:
return self._un_partial_stated_rooms_stream_id_gen

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,9 @@ def get_room_max_token(self) -> RoomStreamToken:

return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))

def get_events_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._stream_id_gen

async def get_room_events_stream_for_rooms(
self,
room_ids: Collection[str],
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ def _update_stream_positions_table_txn(self, txn: Cursor) -> None:
pos = self.get_current_token_for_writer(self._instance_name)
txn.execute(sql, (self._stream_name, self._instance_name, pos))

async def get_max_allocated_token(self) -> int:
return await self._db.runInteraction(
"get_max_allocated_token", self._sequence_gen.get_max_allocated
)


@attr.s(frozen=True, auto_attribs=True)
class _AsyncCtxManagerWrapper(Generic[T]):
Expand Down
24 changes: 24 additions & 0 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def check_consistency(
"""
...

@abc.abstractmethod
def get_max_allocated(self, txn: Cursor) -> int:
"""Get the maximum ID that we have allocated"""


class PostgresSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses a postgres sequence"""
Expand Down Expand Up @@ -190,6 +194,17 @@ def check_consistency(
% {"seq": self._sequence_name, "stream_name": stream_name}
)

def get_max_allocated(self, txn: Cursor) -> int:
# We just read from the sequence what the last value we fetched was.
txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
row = txn.fetchone()
assert row is not None

last_value, is_called = row
if not is_called:
last_value -= 1
return last_value


GetFirstCallbackType = Callable[[Cursor], int]

Expand Down Expand Up @@ -248,6 +263,15 @@ def check_consistency(
# There is nothing to do for in memory sequences
pass

def get_max_allocated(self, txn: Cursor) -> int:
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None

return self._current_max_id


def build_sequence_generator(
db_conn: "LoggingDatabaseConnection",
Expand Down
64 changes: 63 additions & 1 deletion synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.logging.opentracing import trace
from synapse.streams import EventSource
from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.types import (
AbstractMultiWriterStreamToken,
MultiWriterStreamToken,
StreamKeyType,
StreamToken,
)

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -91,6 +96,63 @@ def get_current_token(self) -> StreamToken:
)
return token

async def bound_future_token(self, token: StreamToken) -> StreamToken:
"""Bound a token that is ahead of the current token to the maximum
persisted values.
This ensures that if we wait for the given token we know the stream will
eventually advance to that point.
This works around a bug where older Synapse versions will give out
tokens for streams, and then after a restart will give back tokens where
the stream has "gone backwards".
"""

current_token = self.get_current_token()

stream_key_to_id_gen = {
StreamKeyType.ROOM: self.store.get_events_stream_id_generator(),
StreamKeyType.PRESENCE: self.store.get_presence_stream_id_gen(),
StreamKeyType.RECEIPT: self.store.get_receipts_stream_id_gen(),
StreamKeyType.ACCOUNT_DATA: self.store.get_account_data_id_generator(),
StreamKeyType.PUSH_RULES: self.store.get_push_rules_stream_id_gen(),
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
}

for _, key in StreamKeyType.__members__.items():
if key == StreamKeyType.TYPING:
# Typing stream is allowed to "reset", and so comparisons don't
# really make sense as is.
# TODO: Figure out a better way of tracking resets.
continue

token_value = token.get_field(key)
current_value = current_token.get_field(key)

if isinstance(token_value, AbstractMultiWriterStreamToken):
assert type(current_value) is type(token_value)

if not token_value.is_before_or_eq(current_value): # type: ignore[arg-type]
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()

token = token.copy_and_replace(
key, token.room_key.bound_stream_token(max_token)
)
else:
assert isinstance(current_value, int)
if current_value < token_value:
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()

token = token.copy_and_replace(key, min(token_value, max_token))

return token

@trace
async def get_start_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the start token for a given room to be used to paginate
Expand Down
18 changes: 18 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,16 @@ def is_before_or_eq(self, other_token: Self) -> bool:

return True

def bound_stream_token(self, max_stream: int) -> "Self":
"""Bound the stream positions to a maximum value"""

return type(self)(
stream=min(self.stream, max_stream),
instance_map=immutabledict(
{k: min(s, max_stream) for k, s in self.instance_map.items()}
),
)


@attr.s(frozen=True, slots=True, order=False)
class RoomStreamToken(AbstractMultiWriterStreamToken):
Expand Down Expand Up @@ -722,6 +732,14 @@ async def to_string(self, store: "DataStore") -> str:
else:
return "s%d" % (self.stream,)

def bound_stream_token(self, max_stream: int) -> "RoomStreamToken":
"""See super class"""

# This only makes sense for stream tokens.
assert self.topological is None

return super().bound_stream_token(max_stream)


@attr.s(frozen=True, slots=True, order=False)
class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
Expand Down
Loading

0 comments on commit 1ce59d7

Please sign in to comment.